diff --git a/database/client.go b/database/client.go index d1365a19f3d94763ae8cc360fd62c53f5de85314..6d745bbb6014123fd774ae0cfc4605e6798cd17b 100644 --- a/database/client.go +++ b/database/client.go @@ -1,66 +1,50 @@ package database import ( + "code.fbi.h-da.de/cocsn/yang-modules/generated/tapi" "errors" "github.com/neo4j/neo4j-go-driver/neo4j" log "github.com/sirupsen/logrus" ) //Database is a database -type Database struct { - driver neo4j.Driver -} - -//PND is a principle network domain -type PND struct { - name string - description string - interfaces []string +type Client struct { + driver neo4j.Driver + tapiClient *tapiDatabaseClient } //NewDatabaseClient creates a database client -func NewDatabaseClient(uri, username, password string, encrypted bool) Database { - driver := createDriver(uri, username, password, encrypted) +func NewDatabaseClient(uri, username, password string, + encrypted bool) (*Client, error) { + + d, err := createDriver(uri, username, password, encrypted) - return Database{ - driver: driver, + if err != nil { + return nil, err } + + return &Client{ + driver: d, + tapiClient: newTapiDatabaseClient(d), + }, nil } //createDriver creates a neo4j.Driver instance -func createDriver(uri, username, password string, encrypted bool) neo4j.Driver { - driver, err := neo4j.NewDriver( +func createDriver(uri, username, password string, encrypted bool) (neo4j.Driver, error) { + return neo4j.NewDriver( uri, neo4j.BasicAuth(username, password, ""), func(c *neo4j.Config) { c.Encrypted = encrypted }, ) - - if err != nil { - log.Info("failed creating database driver:", err) - } - - return driver } -//createSession creates a neo4j.Session -func createSession(driver neo4j.Driver, write bool) neo4j.Session { - var sessionConfig neo4j.SessionConfig - - if write { - sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite} - } else { - sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead} - } - - session, err := driver.NewSession(sessionConfig) - - if err != nil { - log.Info(err) - } - - return session +//PND is a principle network domain +type PND struct { + Name string + Description string + Interfaces []string } //storePndTxFunc transaction to store a pnd in the database @@ -94,11 +78,11 @@ func storePndTxFunc(name, description string, interfaces []string) neo4j.Transac } //StorePND stores the given principle network domain -func (d Database) StorePND(pnd *PND) neo4j.Node { - session := createSession(d.driver, true) +func (c Client) StorePND(pnd *PND) neo4j.Node { + session := createSession(c.driver, true) defer session.Close() - result, err := session.WriteTransaction(storePndTxFunc(pnd.name, pnd.description, pnd.interfaces)) + result, err := session.WriteTransaction(storePndTxFunc(pnd.Name, pnd.Description, pnd.Interfaces)) if err != nil { log.Info(err) } @@ -107,245 +91,12 @@ func (d Database) StorePND(pnd *PND) neo4j.Node { return result.(neo4j.Node) } -//RemovePND removes the given principle network domain by id. -func (d Database) RemovePND(id string) {} - -//GetPNDByID gets a specific PND by the given ID. -func (d Database) GetPNDByID(id string) {} - -//GetNodesByLabel gets all nodes that belong to a specific label. -func (d Database) GetNodesByLabel(label string) {} - -//GetNodeByID gets a specific node by ID. -func (d Database) GetNodeByID(id string) {} - -//storeNodeTxFunc transaction to store devices from a json. -//relates them to a specific pnd id. -//returns a slice of added devices -func storeNodeTxFunc(id, name string, pndId int64) neo4j.TransactionWork { - return func(tx neo4j.Transaction) (interface{}, error) { - var nodelist []neo4j.Node - query := - ` - MERGE (device:Device {id: $id}) - ON CREATE SET device.name = $name - WITH device - MATCH (pnd:PND) - WHERE id(pnd) = $pndId - MERGE (device)-[:BELONGS_TO]->(pnd) - RETURN device - ` - - result, err := tx.Run(query, map[string]interface{}{ - "id": id, - "name": name, - "pndId": pndId, - }) - - if err != nil { - //TODO: handle neo4j.isServiceUnavailable() - return nil, err - } - - for result.Next() { - nodelist = append(nodelist, result.Record().GetByIndex(0).(neo4j.Node)) - } - - if err = result.Err(); err != nil { - return nil, err - } - - return nodelist, nil - } -} - -//StoreNode stores the given nodes to the database and adds them to a -//principle networt domain (PND). It is required for a node to belong to a PND. -func (d Database) StoreNode(id, name string) []neo4j.Node { - //TODO: remove this after testing and add own gRPC call for it - testPND := PND{name: "test_PND", description: "very interesting", interfaces: []string{"TAPI", "RESTCONF"}} - pndId := d.StorePND(&testPND).Id() - - session := createSession(d.driver, true) - defer session.Close() - - result, err := session.WriteTransaction(storeNodeTxFunc(id, name, pndId)) - if err != nil { - log.Info(err) - } - - log.Info("added/updated devices (count): ", len(result.([]neo4j.Node))) - return result.([]neo4j.Node) -} - -//RemoveNodes removes the given nodes and their relationships -func (d Database) RemoveNodes(json string) {} - -//RemoveSingleNode removes the given node and their relationship by id. -func (d Database) RemoveSingleNode(id string) {} - -//storeLinkTxFunc transaction to store links from a json. -//creates relation between different devices. -//returns a slice of those created relations. -func storeLinkTxFunc(nep1Id, nep2Id string) neo4j.TransactionWork { - return func(tx neo4j.Transaction) (interface{}, error) { - var relationsList []neo4j.Relationship - query := - // ` - // MATCH (d:Device), (d2:Device) - // WHERE d.id = $nep1Id - // AND d2.id = $nep2Id - // CALL apoc.merge.relationship(d,$layerQualifier,{},{}, d2,{}) - // YIELD rel - // RETURN rel - // ` - ` - MATCH (d:Device), (d2:Device) - WHERE d.id = $nep1Id - AND d2.id = $nep2Id - MERGE (d)-[r:connected]->(d2) - ` - - result, err := tx.Run(query, map[string]interface{}{ - "nep1Id": nep1Id, - "nep2Id": nep2Id, - }) - - if err != nil { - //TODO: handle neo4j.isServiceUnavailable() - return nil, err - } - - for result.Next() { - relationsList = append(relationsList, result.Record().GetByIndex(0).(neo4j.Relationship)) - } - - if err = result.Err(); err != nil { - return nil, err - } - - return relationsList, nil - } -} - -//StoreLink stores the links between nodes -func (d Database) StoreLink(nep1Id, nep2Id string) []neo4j.Relationship { - session := createSession(d.driver, true) - defer session.Close() - - result, err := session.WriteTransaction(storeLinkTxFunc(nep1Id, nep2Id)) - if err != nil { - log.Info(err) - } - - log.Info("added/updated links (count): ", len(result.([]neo4j.Relationship))) - - return result.([]neo4j.Relationship) -} - -//storeNodeEdgePointsTxFunc transaction to store interfaces from a json. -//returns count of added/updated interfaces -func storeNodeEdgePointsTxFunc(json string) neo4j.TransactionWork { - return func(tx neo4j.Transaction) (interface{}, error) { - query := - ` - WITH apoc.convert.fromJsonMap($stringToAdd) - AS value - UNWIND value.data as i - MERGE (interface:Interface {id: i.object_id}) - ON CREATE SET interface.object_type =i.object_type, - interface.localId = i.object_data.` + "`tapi-object-data`.name[0].value," + ` - interface.location = i.object_data.` + "`tapi-object-data`.name[1].value," + ` - interface.` + "`containing-node` = i.object_data.`tapi-object-data`.`containing-node`" + ` - RETURN count(interface) - ` - - result, err := tx.Run(query, map[string]interface{}{ - "stringToAdd": json, - }) - - if err != nil { - //TODO: handle neo4j.isServiceUnavailable() - return nil, err - } - - if result.Next() { - return result.Record().GetByIndex(0), nil - } - - return nil, errors.New("expected a record") - } -} - -//TODO: currently this goes over each and every device/interface and adds -// a interface_of relation. -> do it only for the newly added interfaces +func (c Client) StoreDevice(device interface{}, pndId int64) ([]neo4j.Node, error) { + switch device := device.(type) { + case *tapi.TapiCommon_Context_TopologyContext_Topology_Node: + return c.tapiClient.StoreNode(*device.Uuid, *device.Name["nativeName"].Value, pndId) -//storeNodeEdgePointsRelationTxFunc transaction to create relations between interfaces and devices -//returns count of added/updated relations -func storeNodeEdgePointsRelationTxFunc() neo4j.TransactionWork { - return func(tx neo4j.Transaction) (interface{}, error) { - query := - ` - MATCH (d:Device), (i:Interface) - WHERE d.id = i.` + "`containing-node`" + ` - MERGE (i)-[r:INTERFACE_OF]->(d) - RETURN count(r) - ` - - result, err := tx.Run(query, nil) - - if err != nil { - //TODO: handle neo4j.isServiceUnavailable() - return nil, err - } - - if result.Next() { - return result.Record().GetByIndex(0), nil - } - - return nil, errors.New("expected a record") - } -} - -//StoreNodeEdgePoints stores the given node edge points (interfaces) -func (d Database) StoreNodeEdgePoints(json string) { - session := createSession(d.driver, true) - defer session.Close() - - result, err := session.WriteTransaction(storeNodeEdgePointsTxFunc(json)) - if err != nil { - log.Info(err) - } - - _, err = session.WriteTransaction(storeNodeEdgePointsRelationTxFunc()) - if err != nil { - log.Info(err) + default: + return nil, errors.New("unsupported DeviceType") } - - log.Info("added/updated nodeEdgePoints (count): ", result) -} - -//StoreConnections stores relations between nodes -func (d Database) StoreConnections(json string) {} - -//StoreTopology creates a new network topology node. Can also create a relation -//the new node and a existing one if desired -func StoreTopology() {} - -//RemoveTopology removes the given network topology. This includes the node itself -//aswell as the containing links and relations -func RemoveTopology() {} - -//CreateTopologyRelation creates a relation between two given topologies -func CreateTopologyRelation() {} - -//CreateLink creates a link between two network elements -func CreateLink() {} - -//RemoveLink removes a link between two network elements -func RemoveLink() {} - -//Shutdown closes the connection to the database -func (d Database) Shutdown() error { - return d.driver.Close() } diff --git a/database/helpers.go b/database/helpers.go new file mode 100644 index 0000000000000000000000000000000000000000..5744455b0d3a849e9f9f71e129ecb11d46b56c28 --- /dev/null +++ b/database/helpers.go @@ -0,0 +1,25 @@ +package database + +import ( + "github.com/neo4j/neo4j-go-driver/neo4j" + log "github.com/sirupsen/logrus" +) + +//createSession creates a neo4j.Session +func createSession(driver neo4j.Driver, write bool) neo4j.Session { + var sessionConfig neo4j.SessionConfig + + if write { + sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite} + } else { + sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead} + } + + session, err := driver.NewSession(sessionConfig) + + if err != nil { + log.Info(err) + } + + return session +} diff --git a/database/tapiDatabaseClient.go b/database/tapiDatabaseClient.go new file mode 100644 index 0000000000000000000000000000000000000000..4a0e343bec3d1d2f690a08c596e1fafab11651e0 --- /dev/null +++ b/database/tapiDatabaseClient.go @@ -0,0 +1,210 @@ +package database + +import ( + "errors" + "github.com/neo4j/neo4j-go-driver/neo4j" + log "github.com/sirupsen/logrus" +) + +type tapiDatabaseClient struct { + driver neo4j.Driver +} + +//newTapiDatabaseClient creates a new tapiDatabaseClient with the given neo4j.Driver +func newTapiDatabaseClient(d neo4j.Driver) *tapiDatabaseClient { + return &tapiDatabaseClient{ + driver: d, + } +} + +//storeNodeTxFunc transaction to store devices from a json. +//relates them to a specific pnd id. +//returns a slice of added devices +func storeNodeTxFunc(id, name string, pndId int64) neo4j.TransactionWork { + return func(tx neo4j.Transaction) (interface{}, error) { + var nodelist []neo4j.Node + query := + ` + MERGE (device:Device {id: $id}) + ON CREATE SET device.name = $name + WITH device + MATCH (pnd:PND) + WHERE id(pnd) = $pndId + MERGE (device)-[:BELONGS_TO]->(pnd) + RETURN device + ` + + result, err := tx.Run(query, map[string]interface{}{ + "id": id, + "name": name, + "pndId": pndId, + }) + + if err != nil { + //TODO: handle neo4j.isServiceUnavailable() + return nil, err + } + + for result.Next() { + nodelist = append(nodelist, result.Record().GetByIndex(0).(neo4j.Node)) + } + + if err = result.Err(); err != nil { + return nil, err + } + + return nodelist, nil + } +} + +//StoreNode stores the given nodes to the database and adds them to a +//principle networt domain (PND). It is required for a node to belong to a PND. +func (d tapiDatabaseClient) StoreNode(id, name string, pndId int64) ([]neo4j.Node, error) { + //TODO: remove this after testing and add own gRPC call for it + // testPND := PND{name: "test_PND", description: "very interesting", interfaces: []string{"TAPI", "RESTCONF"}} + // pndId := d.StorePND(&testPND).Id() + + session := createSession(d.driver, true) + defer session.Close() + + result, err := session.WriteTransaction(storeNodeTxFunc(id, name, pndId)) + if err != nil { + log.Info(err) + } + + log.Info("added/updated devices (count): ", len(result.([]neo4j.Node))) + return result.([]neo4j.Node), err +} + +//storeLinkTxFunc transaction to store links from a json. +//creates relation between different devices. +//returns a slice of those created relations. +func storeLinkTxFunc(nep1Id, nep2Id string) neo4j.TransactionWork { + return func(tx neo4j.Transaction) (interface{}, error) { + var relationsList []neo4j.Relationship + query := + ` + MATCH (d:Device), (d2:Device) + WHERE d.id = $nep1Id + AND d2.id = $nep2Id + MERGE (d)-[r:connected]->(d2) + ` + + result, err := tx.Run(query, map[string]interface{}{ + "nep1Id": nep1Id, + "nep2Id": nep2Id, + }) + + if err != nil { + //TODO: handle neo4j.isServiceUnavailable() + return nil, err + } + + for result.Next() { + relationsList = append(relationsList, result.Record().GetByIndex(0).(neo4j.Relationship)) + } + + if err = result.Err(); err != nil { + return nil, err + } + + return relationsList, nil + } +} + +//StoreLink stores the links between nodes +func (d tapiDatabaseClient) StoreLink(nep1Id, nep2Id string) []neo4j.Relationship { + session := createSession(d.driver, true) + defer session.Close() + + result, err := session.WriteTransaction(storeLinkTxFunc(nep1Id, nep2Id)) + if err != nil { + log.Info(err) + } + + log.Info("added/updated links (count): ", len(result.([]neo4j.Relationship))) + + return result.([]neo4j.Relationship) +} + +//storeNodeEdgePointsTxFunc transaction to store interfaces from a json. +//returns count of added/updated interfaces +func storeNodeEdgePointsTxFunc(json string) neo4j.TransactionWork { + return func(tx neo4j.Transaction) (interface{}, error) { + query := + ` + WITH apoc.convert.fromJsonMap($stringToAdd) + AS value + UNWIND value.data as i + MERGE (interface:Interface {id: i.object_id}) + ON CREATE SET interface.object_type =i.object_type, + interface.localId = i.object_data.` + "`tapi-object-data`.name[0].value," + ` + interface.location = i.object_data.` + "`tapi-object-data`.name[1].value," + ` + interface.` + "`containing-node` = i.object_data.`tapi-object-data`.`containing-node`" + ` + RETURN count(interface) + ` + + result, err := tx.Run(query, map[string]interface{}{ + "stringToAdd": json, + }) + + if err != nil { + //TODO: handle neo4j.isServiceUnavailable() + return nil, err + } + + if result.Next() { + return result.Record().GetByIndex(0), nil + } + + return nil, errors.New("expected a record") + } +} + +//TODO: currently this goes over each and every device/interface and adds +// a interface_of relation. -> do it only for the newly added interfaces + +//storeNodeEdgePointsRelationTxFunc transaction to create relations between interfaces and devices +//returns count of added/updated relations +func storeNodeEdgePointsRelationTxFunc() neo4j.TransactionWork { + return func(tx neo4j.Transaction) (interface{}, error) { + query := + ` + MATCH (d:Device), (i:Interface) + WHERE d.id = i.` + "`containing-node`" + ` + MERGE (i)-[r:INTERFACE_OF]->(d) + RETURN count(r) + ` + + result, err := tx.Run(query, nil) + + if err != nil { + //TODO: handle neo4j.isServiceUnavailable() + return nil, err + } + + if result.Next() { + return result.Record().GetByIndex(0), nil + } + + return nil, errors.New("expected a record") + } +} + +//StoreNodeEdgePoints stores the given node edge points (interfaces) +func (d tapiDatabaseClient) StoreNodeEdgePoints(json string) { + session := createSession(d.driver, true) + defer session.Close() + + result, err := session.WriteTransaction(storeNodeEdgePointsTxFunc(json)) + if err != nil { + log.Info(err) + } + + _, err = session.WriteTransaction(storeNodeEdgePointsRelationTxFunc()) + if err != nil { + log.Info(err) + } + + log.Info("added/updated nodeEdgePoints (count): ", result) +} diff --git a/nucleus/controller.go b/nucleus/controller.go index 7d0541f881fb8ffd86e653c220bfcef4806eb5c6..5e93f90b2f4b71df83f95975e97cc76cf749bb2d 100644 --- a/nucleus/controller.go +++ b/nucleus/controller.go @@ -26,7 +26,7 @@ type clientConfigs struct { type Core struct { //Assert type with clients[key].(*MCPClient) clients map[string]interfaces.Client - database database.Database + database *database.Client config controllerConfig IsRunning chan bool } @@ -51,8 +51,10 @@ func (c *Core) Init(socket, configFileController, configFileClient string, IsRun } // AttachDatabase connects to the database and passes the connectio to the controller core -func (c *Core) AttachDatabase() { - c.database = database.NewDatabaseClient(c.config.DatabaseSocket, c.config.DatabaseUser, c.config.DatabasePassword, c.config.DatabaseCrypto) +func (c *Core) AttachDatabase() error { + dbClient, err := database.NewDatabaseClient(c.config.DatabaseSocket, c.config.DatabaseUser, c.config.DatabasePassword, c.config.DatabaseCrypto) + c.database = dbClient + return err } // Shutdown waits for the shutdown signal and gracefully shuts down once it arrived @@ -103,7 +105,7 @@ func (c *Core) readClientConfig(configFileClient string) error { return err } for _, client := range clients.Client { - c.clients[client.Identifier] = ciena.NewMCPClient(client.Endpoint, client.Username, client.Password, &c.database, &client) + c.clients[client.Identifier] = ciena.NewMCPClient(client.Endpoint, client.Username, client.Password, c.database, &client) } return nil } diff --git a/nucleus/nucleus-core.go b/nucleus/nucleus-core.go index 4e92cf46abe187399459dfb8aa5428ee450d0af0..e8f6f3be2fdd4d5f4440538dfdbc79573a07a0c6 100644 --- a/nucleus/nucleus-core.go +++ b/nucleus/nucleus-core.go @@ -15,7 +15,7 @@ func StartAndRun(socket, filename string, IsRunningChannel chan bool) { // Init the Core core := Core{ clients: make(map[string]interfaces.Client), - database: database.Database{}, + database: &database.Client{}, } core.Init(socket, filename, "", IsRunningChannel) // Start the GRCP CLI diff --git a/sbi/restconf/client/ciena/client.go b/sbi/restconf/client/ciena/client.go index 82a1efe954af2999e87cfbd66f7ee2422fc12e8c..2fd4d737dfdb433936eba1905da55d0e1e7dfd16 100644 --- a/sbi/restconf/client/ciena/client.go +++ b/sbi/restconf/client/ciena/client.go @@ -23,7 +23,7 @@ import ( type MCPClient struct { transport *httptransport.Runtime client *apiclient.ServiceTopologyTAPI - database *database.Database + database *database.Client buffer *bytes.Buffer config *interfaces.ClientConfig device ygot.GoStruct @@ -36,7 +36,7 @@ func (c MCPClient) GetConfig() interfaces.ClientConfig { } //NewMCPClient creates a Ciena flavores TAPI client -func NewMCPClient(endpoint, username, password string, database *database.Database, config *interfaces.ClientConfig) *MCPClient { +func NewMCPClient(endpoint, username, password string, database *database.Client, config *interfaces.ClientConfig) *MCPClient { // create the transport transport := httptransport.New(endpoint, "/", nil) transport.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} @@ -119,7 +119,8 @@ func (c *MCPClient) GetLinks() error { } count++ } - c.database.StoreLink(nep1, nep2) + //c.database.StoreLink(nep1, nep2) + log.Info(nep1, nep2) } log.Debug(c.buffer.Next(25)) @@ -135,6 +136,9 @@ func (c *MCPClient) GetNodes() error { return err } + testPND := database.PND{Name: "test_PND", Description: "very interesting", Interfaces: []string{"TAPI", "RESTCONF"}} + pndId := c.database.StorePND(&testPND).Id() + json := gjson.Get(c.buffer.String(), c.config.GjsonDefaultPath) for _, jsonEntry := range json.Array() { @@ -145,7 +149,8 @@ func (c *MCPClient) GetNodes() error { //lot of them. log.Info(err) } - c.database.StoreNode(*dest.Uuid, *dest.Name["nativeName"].Value) + c.database.StoreDevice(dest, pndId) + // c.database.StoreNode(*dest.Uuid, *dest.Name["nativeName"].Value) } log.Debug(c.buffer.Next(25)) @@ -165,7 +170,7 @@ func (c *MCPClient) GetNodeEdgePoints() error { if err := tapi.Unmarshal(c.buffer.Bytes(), dest); err != nil { return err } - c.database.StoreNodeEdgePoints(c.buffer.String()) + // c.database.StoreNodeEdgePoints(c.buffer.String()) log.Debug(c.buffer.Next(25)) return err }