Skip to content
Snippets Groups Projects
Commit f37d49c8 authored by Malte Bauch's avatar Malte Bauch
Browse files

first steps moving database to a facade pattern

parent 44f0504a
No related branches found
No related tags found
1 merge request!81Draft: Resolve "update database for ygot"
Pipeline #55840 passed with warnings
This commit is part of merge request !81. Comments created here will be created in the context of that merge request.
package database package database
import ( import (
"code.fbi.h-da.de/cocsn/yang-modules/generated/tapi"
"errors" "errors"
"github.com/neo4j/neo4j-go-driver/neo4j" "github.com/neo4j/neo4j-go-driver/neo4j"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
//Database is a database //Database is a database
type Database struct { type Client struct {
driver neo4j.Driver driver neo4j.Driver
} tapiClient *tapiDatabaseClient
//PND is a principle network domain
type PND struct {
name string
description string
interfaces []string
} }
//NewDatabaseClient creates a database client //NewDatabaseClient creates a database client
func NewDatabaseClient(uri, username, password string, encrypted bool) Database { func NewDatabaseClient(uri, username, password string,
driver := createDriver(uri, username, password, encrypted) encrypted bool) (*Client, error) {
d, err := createDriver(uri, username, password, encrypted)
return Database{ if err != nil {
driver: driver, return nil, err
} }
return &Client{
driver: d,
tapiClient: newTapiDatabaseClient(d),
}, nil
} }
//createDriver creates a neo4j.Driver instance //createDriver creates a neo4j.Driver instance
func createDriver(uri, username, password string, encrypted bool) neo4j.Driver { func createDriver(uri, username, password string, encrypted bool) (neo4j.Driver, error) {
driver, err := neo4j.NewDriver( return neo4j.NewDriver(
uri, uri,
neo4j.BasicAuth(username, password, ""), neo4j.BasicAuth(username, password, ""),
func(c *neo4j.Config) { func(c *neo4j.Config) {
c.Encrypted = encrypted c.Encrypted = encrypted
}, },
) )
if err != nil {
log.Info("failed creating database driver:", err)
}
return driver
} }
//createSession creates a neo4j.Session //PND is a principle network domain
func createSession(driver neo4j.Driver, write bool) neo4j.Session { type PND struct {
var sessionConfig neo4j.SessionConfig Name string
Description string
if write { Interfaces []string
sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}
} else {
sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}
}
session, err := driver.NewSession(sessionConfig)
if err != nil {
log.Info(err)
}
return session
} }
//storePndTxFunc transaction to store a pnd in the database //storePndTxFunc transaction to store a pnd in the database
...@@ -94,11 +78,11 @@ func storePndTxFunc(name, description string, interfaces []string) neo4j.Transac ...@@ -94,11 +78,11 @@ func storePndTxFunc(name, description string, interfaces []string) neo4j.Transac
} }
//StorePND stores the given principle network domain //StorePND stores the given principle network domain
func (d Database) StorePND(pnd *PND) neo4j.Node { func (c Client) StorePND(pnd *PND) neo4j.Node {
session := createSession(d.driver, true) session := createSession(c.driver, true)
defer session.Close() defer session.Close()
result, err := session.WriteTransaction(storePndTxFunc(pnd.name, pnd.description, pnd.interfaces)) result, err := session.WriteTransaction(storePndTxFunc(pnd.Name, pnd.Description, pnd.Interfaces))
if err != nil { if err != nil {
log.Info(err) log.Info(err)
} }
...@@ -107,245 +91,12 @@ func (d Database) StorePND(pnd *PND) neo4j.Node { ...@@ -107,245 +91,12 @@ func (d Database) StorePND(pnd *PND) neo4j.Node {
return result.(neo4j.Node) return result.(neo4j.Node)
} }
//RemovePND removes the given principle network domain by id. func (c Client) StoreDevice(device interface{}, pndId int64) ([]neo4j.Node, error) {
func (d Database) RemovePND(id string) {} switch device := device.(type) {
case *tapi.TapiCommon_Context_TopologyContext_Topology_Node:
//GetPNDByID gets a specific PND by the given ID. return c.tapiClient.StoreNode(*device.Uuid, *device.Name["nativeName"].Value, pndId)
func (d Database) GetPNDByID(id string) {}
//GetNodesByLabel gets all nodes that belong to a specific label.
func (d Database) GetNodesByLabel(label string) {}
//GetNodeByID gets a specific node by ID.
func (d Database) GetNodeByID(id string) {}
//storeNodeTxFunc transaction to store devices from a json.
//relates them to a specific pnd id.
//returns a slice of added devices
func storeNodeTxFunc(id, name string, pndId int64) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
var nodelist []neo4j.Node
query :=
`
MERGE (device:Device {id: $id})
ON CREATE SET device.name = $name
WITH device
MATCH (pnd:PND)
WHERE id(pnd) = $pndId
MERGE (device)-[:BELONGS_TO]->(pnd)
RETURN device
`
result, err := tx.Run(query, map[string]interface{}{
"id": id,
"name": name,
"pndId": pndId,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
for result.Next() {
nodelist = append(nodelist, result.Record().GetByIndex(0).(neo4j.Node))
}
if err = result.Err(); err != nil {
return nil, err
}
return nodelist, nil
}
}
//StoreNode stores the given nodes to the database and adds them to a
//principle networt domain (PND). It is required for a node to belong to a PND.
func (d Database) StoreNode(id, name string) []neo4j.Node {
//TODO: remove this after testing and add own gRPC call for it
testPND := PND{name: "test_PND", description: "very interesting", interfaces: []string{"TAPI", "RESTCONF"}}
pndId := d.StorePND(&testPND).Id()
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeNodeTxFunc(id, name, pndId))
if err != nil {
log.Info(err)
}
log.Info("added/updated devices (count): ", len(result.([]neo4j.Node)))
return result.([]neo4j.Node)
}
//RemoveNodes removes the given nodes and their relationships
func (d Database) RemoveNodes(json string) {}
//RemoveSingleNode removes the given node and their relationship by id.
func (d Database) RemoveSingleNode(id string) {}
//storeLinkTxFunc transaction to store links from a json.
//creates relation between different devices.
//returns a slice of those created relations.
func storeLinkTxFunc(nep1Id, nep2Id string) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
var relationsList []neo4j.Relationship
query :=
// `
// MATCH (d:Device), (d2:Device)
// WHERE d.id = $nep1Id
// AND d2.id = $nep2Id
// CALL apoc.merge.relationship(d,$layerQualifier,{},{}, d2,{})
// YIELD rel
// RETURN rel
// `
`
MATCH (d:Device), (d2:Device)
WHERE d.id = $nep1Id
AND d2.id = $nep2Id
MERGE (d)-[r:connected]->(d2)
`
result, err := tx.Run(query, map[string]interface{}{
"nep1Id": nep1Id,
"nep2Id": nep2Id,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
for result.Next() {
relationsList = append(relationsList, result.Record().GetByIndex(0).(neo4j.Relationship))
}
if err = result.Err(); err != nil {
return nil, err
}
return relationsList, nil
}
}
//StoreLink stores the links between nodes
func (d Database) StoreLink(nep1Id, nep2Id string) []neo4j.Relationship {
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeLinkTxFunc(nep1Id, nep2Id))
if err != nil {
log.Info(err)
}
log.Info("added/updated links (count): ", len(result.([]neo4j.Relationship)))
return result.([]neo4j.Relationship)
}
//storeNodeEdgePointsTxFunc transaction to store interfaces from a json.
//returns count of added/updated interfaces
func storeNodeEdgePointsTxFunc(json string) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
query :=
`
WITH apoc.convert.fromJsonMap($stringToAdd)
AS value
UNWIND value.data as i
MERGE (interface:Interface {id: i.object_id})
ON CREATE SET interface.object_type =i.object_type,
interface.localId = i.object_data.` + "`tapi-object-data`.name[0].value," + `
interface.location = i.object_data.` + "`tapi-object-data`.name[1].value," + `
interface.` + "`containing-node` = i.object_data.`tapi-object-data`.`containing-node`" + `
RETURN count(interface)
`
result, err := tx.Run(query, map[string]interface{}{
"stringToAdd": json,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
if result.Next() {
return result.Record().GetByIndex(0), nil
}
return nil, errors.New("expected a record")
}
}
//TODO: currently this goes over each and every device/interface and adds
// a interface_of relation. -> do it only for the newly added interfaces
//storeNodeEdgePointsRelationTxFunc transaction to create relations between interfaces and devices default:
//returns count of added/updated relations return nil, errors.New("unsupported DeviceType")
func storeNodeEdgePointsRelationTxFunc() neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
query :=
`
MATCH (d:Device), (i:Interface)
WHERE d.id = i.` + "`containing-node`" + `
MERGE (i)-[r:INTERFACE_OF]->(d)
RETURN count(r)
`
result, err := tx.Run(query, nil)
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
if result.Next() {
return result.Record().GetByIndex(0), nil
}
return nil, errors.New("expected a record")
}
}
//StoreNodeEdgePoints stores the given node edge points (interfaces)
func (d Database) StoreNodeEdgePoints(json string) {
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeNodeEdgePointsTxFunc(json))
if err != nil {
log.Info(err)
}
_, err = session.WriteTransaction(storeNodeEdgePointsRelationTxFunc())
if err != nil {
log.Info(err)
} }
log.Info("added/updated nodeEdgePoints (count): ", result)
}
//StoreConnections stores relations between nodes
func (d Database) StoreConnections(json string) {}
//StoreTopology creates a new network topology node. Can also create a relation
//the new node and a existing one if desired
func StoreTopology() {}
//RemoveTopology removes the given network topology. This includes the node itself
//aswell as the containing links and relations
func RemoveTopology() {}
//CreateTopologyRelation creates a relation between two given topologies
func CreateTopologyRelation() {}
//CreateLink creates a link between two network elements
func CreateLink() {}
//RemoveLink removes a link between two network elements
func RemoveLink() {}
//Shutdown closes the connection to the database
func (d Database) Shutdown() error {
return d.driver.Close()
} }
package database
import (
"github.com/neo4j/neo4j-go-driver/neo4j"
log "github.com/sirupsen/logrus"
)
//createSession creates a neo4j.Session
func createSession(driver neo4j.Driver, write bool) neo4j.Session {
var sessionConfig neo4j.SessionConfig
if write {
sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}
} else {
sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}
}
session, err := driver.NewSession(sessionConfig)
if err != nil {
log.Info(err)
}
return session
}
package database
import (
"errors"
"github.com/neo4j/neo4j-go-driver/neo4j"
log "github.com/sirupsen/logrus"
)
type tapiDatabaseClient struct {
driver neo4j.Driver
}
//newTapiDatabaseClient creates a new tapiDatabaseClient with the given neo4j.Driver
func newTapiDatabaseClient(d neo4j.Driver) *tapiDatabaseClient {
return &tapiDatabaseClient{
driver: d,
}
}
//storeNodeTxFunc transaction to store devices from a json.
//relates them to a specific pnd id.
//returns a slice of added devices
func storeNodeTxFunc(id, name string, pndId int64) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
var nodelist []neo4j.Node
query :=
`
MERGE (device:Device {id: $id})
ON CREATE SET device.name = $name
WITH device
MATCH (pnd:PND)
WHERE id(pnd) = $pndId
MERGE (device)-[:BELONGS_TO]->(pnd)
RETURN device
`
result, err := tx.Run(query, map[string]interface{}{
"id": id,
"name": name,
"pndId": pndId,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
for result.Next() {
nodelist = append(nodelist, result.Record().GetByIndex(0).(neo4j.Node))
}
if err = result.Err(); err != nil {
return nil, err
}
return nodelist, nil
}
}
//StoreNode stores the given nodes to the database and adds them to a
//principle networt domain (PND). It is required for a node to belong to a PND.
func (d tapiDatabaseClient) StoreNode(id, name string, pndId int64) ([]neo4j.Node, error) {
//TODO: remove this after testing and add own gRPC call for it
// testPND := PND{name: "test_PND", description: "very interesting", interfaces: []string{"TAPI", "RESTCONF"}}
// pndId := d.StorePND(&testPND).Id()
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeNodeTxFunc(id, name, pndId))
if err != nil {
log.Info(err)
}
log.Info("added/updated devices (count): ", len(result.([]neo4j.Node)))
return result.([]neo4j.Node), err
}
//storeLinkTxFunc transaction to store links from a json.
//creates relation between different devices.
//returns a slice of those created relations.
func storeLinkTxFunc(nep1Id, nep2Id string) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
var relationsList []neo4j.Relationship
query :=
`
MATCH (d:Device), (d2:Device)
WHERE d.id = $nep1Id
AND d2.id = $nep2Id
MERGE (d)-[r:connected]->(d2)
`
result, err := tx.Run(query, map[string]interface{}{
"nep1Id": nep1Id,
"nep2Id": nep2Id,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
for result.Next() {
relationsList = append(relationsList, result.Record().GetByIndex(0).(neo4j.Relationship))
}
if err = result.Err(); err != nil {
return nil, err
}
return relationsList, nil
}
}
//StoreLink stores the links between nodes
func (d tapiDatabaseClient) StoreLink(nep1Id, nep2Id string) []neo4j.Relationship {
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeLinkTxFunc(nep1Id, nep2Id))
if err != nil {
log.Info(err)
}
log.Info("added/updated links (count): ", len(result.([]neo4j.Relationship)))
return result.([]neo4j.Relationship)
}
//storeNodeEdgePointsTxFunc transaction to store interfaces from a json.
//returns count of added/updated interfaces
func storeNodeEdgePointsTxFunc(json string) neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
query :=
`
WITH apoc.convert.fromJsonMap($stringToAdd)
AS value
UNWIND value.data as i
MERGE (interface:Interface {id: i.object_id})
ON CREATE SET interface.object_type =i.object_type,
interface.localId = i.object_data.` + "`tapi-object-data`.name[0].value," + `
interface.location = i.object_data.` + "`tapi-object-data`.name[1].value," + `
interface.` + "`containing-node` = i.object_data.`tapi-object-data`.`containing-node`" + `
RETURN count(interface)
`
result, err := tx.Run(query, map[string]interface{}{
"stringToAdd": json,
})
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
if result.Next() {
return result.Record().GetByIndex(0), nil
}
return nil, errors.New("expected a record")
}
}
//TODO: currently this goes over each and every device/interface and adds
// a interface_of relation. -> do it only for the newly added interfaces
//storeNodeEdgePointsRelationTxFunc transaction to create relations between interfaces and devices
//returns count of added/updated relations
func storeNodeEdgePointsRelationTxFunc() neo4j.TransactionWork {
return func(tx neo4j.Transaction) (interface{}, error) {
query :=
`
MATCH (d:Device), (i:Interface)
WHERE d.id = i.` + "`containing-node`" + `
MERGE (i)-[r:INTERFACE_OF]->(d)
RETURN count(r)
`
result, err := tx.Run(query, nil)
if err != nil {
//TODO: handle neo4j.isServiceUnavailable()
return nil, err
}
if result.Next() {
return result.Record().GetByIndex(0), nil
}
return nil, errors.New("expected a record")
}
}
//StoreNodeEdgePoints stores the given node edge points (interfaces)
func (d tapiDatabaseClient) StoreNodeEdgePoints(json string) {
session := createSession(d.driver, true)
defer session.Close()
result, err := session.WriteTransaction(storeNodeEdgePointsTxFunc(json))
if err != nil {
log.Info(err)
}
_, err = session.WriteTransaction(storeNodeEdgePointsRelationTxFunc())
if err != nil {
log.Info(err)
}
log.Info("added/updated nodeEdgePoints (count): ", result)
}
...@@ -26,7 +26,7 @@ type clientConfigs struct { ...@@ -26,7 +26,7 @@ type clientConfigs struct {
type Core struct { type Core struct {
//Assert type with clients[key].(*MCPClient) //Assert type with clients[key].(*MCPClient)
clients map[string]interfaces.Client clients map[string]interfaces.Client
database database.Database database *database.Client
config controllerConfig config controllerConfig
IsRunning chan bool IsRunning chan bool
} }
...@@ -51,8 +51,10 @@ func (c *Core) Init(socket, configFileController, configFileClient string, IsRun ...@@ -51,8 +51,10 @@ func (c *Core) Init(socket, configFileController, configFileClient string, IsRun
} }
// AttachDatabase connects to the database and passes the connectio to the controller core // AttachDatabase connects to the database and passes the connectio to the controller core
func (c *Core) AttachDatabase() { func (c *Core) AttachDatabase() error {
c.database = database.NewDatabaseClient(c.config.DatabaseSocket, c.config.DatabaseUser, c.config.DatabasePassword, c.config.DatabaseCrypto) dbClient, err := database.NewDatabaseClient(c.config.DatabaseSocket, c.config.DatabaseUser, c.config.DatabasePassword, c.config.DatabaseCrypto)
c.database = dbClient
return err
} }
// Shutdown waits for the shutdown signal and gracefully shuts down once it arrived // Shutdown waits for the shutdown signal and gracefully shuts down once it arrived
...@@ -103,7 +105,7 @@ func (c *Core) readClientConfig(configFileClient string) error { ...@@ -103,7 +105,7 @@ func (c *Core) readClientConfig(configFileClient string) error {
return err return err
} }
for _, client := range clients.Client { for _, client := range clients.Client {
c.clients[client.Identifier] = ciena.NewMCPClient(client.Endpoint, client.Username, client.Password, &c.database, &client) c.clients[client.Identifier] = ciena.NewMCPClient(client.Endpoint, client.Username, client.Password, c.database, &client)
} }
return nil return nil
} }
...@@ -15,7 +15,7 @@ func StartAndRun(socket, filename string, IsRunningChannel chan bool) { ...@@ -15,7 +15,7 @@ func StartAndRun(socket, filename string, IsRunningChannel chan bool) {
// Init the Core // Init the Core
core := Core{ core := Core{
clients: make(map[string]interfaces.Client), clients: make(map[string]interfaces.Client),
database: database.Database{}, database: &database.Client{},
} }
core.Init(socket, filename, "", IsRunningChannel) core.Init(socket, filename, "", IsRunningChannel)
// Start the GRCP CLI // Start the GRCP CLI
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
type MCPClient struct { type MCPClient struct {
transport *httptransport.Runtime transport *httptransport.Runtime
client *apiclient.ServiceTopologyTAPI client *apiclient.ServiceTopologyTAPI
database *database.Database database *database.Client
buffer *bytes.Buffer buffer *bytes.Buffer
config *interfaces.ClientConfig config *interfaces.ClientConfig
device ygot.GoStruct device ygot.GoStruct
...@@ -36,7 +36,7 @@ func (c MCPClient) GetConfig() interfaces.ClientConfig { ...@@ -36,7 +36,7 @@ func (c MCPClient) GetConfig() interfaces.ClientConfig {
} }
//NewMCPClient creates a Ciena flavores TAPI client //NewMCPClient creates a Ciena flavores TAPI client
func NewMCPClient(endpoint, username, password string, database *database.Database, config *interfaces.ClientConfig) *MCPClient { func NewMCPClient(endpoint, username, password string, database *database.Client, config *interfaces.ClientConfig) *MCPClient {
// create the transport // create the transport
transport := httptransport.New(endpoint, "/", nil) transport := httptransport.New(endpoint, "/", nil)
transport.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} transport.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
...@@ -119,7 +119,8 @@ func (c *MCPClient) GetLinks() error { ...@@ -119,7 +119,8 @@ func (c *MCPClient) GetLinks() error {
} }
count++ count++
} }
c.database.StoreLink(nep1, nep2) //c.database.StoreLink(nep1, nep2)
log.Info(nep1, nep2)
} }
log.Debug(c.buffer.Next(25)) log.Debug(c.buffer.Next(25))
...@@ -135,6 +136,9 @@ func (c *MCPClient) GetNodes() error { ...@@ -135,6 +136,9 @@ func (c *MCPClient) GetNodes() error {
return err return err
} }
testPND := database.PND{Name: "test_PND", Description: "very interesting", Interfaces: []string{"TAPI", "RESTCONF"}}
pndId := c.database.StorePND(&testPND).Id()
json := gjson.Get(c.buffer.String(), c.config.GjsonDefaultPath) json := gjson.Get(c.buffer.String(), c.config.GjsonDefaultPath)
for _, jsonEntry := range json.Array() { for _, jsonEntry := range json.Array() {
...@@ -145,7 +149,8 @@ func (c *MCPClient) GetNodes() error { ...@@ -145,7 +149,8 @@ func (c *MCPClient) GetNodes() error {
//lot of them. //lot of them.
log.Info(err) log.Info(err)
} }
c.database.StoreNode(*dest.Uuid, *dest.Name["nativeName"].Value) c.database.StoreDevice(dest, pndId)
// c.database.StoreNode(*dest.Uuid, *dest.Name["nativeName"].Value)
} }
log.Debug(c.buffer.Next(25)) log.Debug(c.buffer.Next(25))
...@@ -165,7 +170,7 @@ func (c *MCPClient) GetNodeEdgePoints() error { ...@@ -165,7 +170,7 @@ func (c *MCPClient) GetNodeEdgePoints() error {
if err := tapi.Unmarshal(c.buffer.Bytes(), dest); err != nil { if err := tapi.Unmarshal(c.buffer.Bytes(), dest); err != nil {
return err return err
} }
c.database.StoreNodeEdgePoints(c.buffer.String()) // c.database.StoreNodeEdgePoints(c.buffer.String())
log.Debug(c.buffer.Next(25)) log.Debug(c.buffer.Next(25))
return err return err
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment