Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
client.go 9.60 KiB
package database

import (
	"errors"
	"github.com/neo4j/neo4j-go-driver/neo4j"
	log "github.com/sirupsen/logrus"
)

//Database is a database
type Database struct {
	driver neo4j.Driver
}

//PND is a principle network domain
type PND struct {
	name        string
	description string
	interfaces  []string
}

//NewDatabaseClient creates a database client
func NewDatabaseClient(uri, username, password string, encrypted bool) Database {
	driver := createDriver(uri, username, password, encrypted)

	return Database{
		driver: driver,
	}
}

//createDriver creates a neo4j.Driver instance
func createDriver(uri, username, password string, encrypted bool) neo4j.Driver {
	driver, err := neo4j.NewDriver(
		uri,
		neo4j.BasicAuth(username, password, ""),
		func(c *neo4j.Config) {
			c.Encrypted = encrypted
		},
	)

	if err != nil {
		log.Info("failed creating database driver:", err)
	}

	return driver
}

//createSession creates a neo4j.Session
func createSession(driver neo4j.Driver, write bool) neo4j.Session {
	var sessionConfig neo4j.SessionConfig

	if write {
		sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}
	} else {
		sessionConfig = neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}
	}

	session, err := driver.NewSession(sessionConfig)

	if err != nil {
		log.Info(err)
	}

	return session
}

//storePndTxFunc transaction to store a pnd in the database
func storePndTxFunc(name, description string, interfaces []string) neo4j.TransactionWork {
	return func(tx neo4j.Transaction) (interface{}, error) {
		query :=
			`
			MERGE (pnd:PND {name: $name})
			ON CREATE SET pnd.description = $description,
				pnd.interfaces = $interfaces
			RETURN pnd
			`

		result, err := tx.Run(query, map[string]interface{}{
			"name":        name,
			"description": description,
			"interfaces":  interfaces,
		})

		if err != nil {
			//TODO: handle neo4j.isServiceUnavailable()
			return nil, err
		}

		if result.Next() {
			return result.Record().GetByIndex(0), nil
		}

		return nil, errors.New("expected a record")
	}
}

//StorePND stores the given principle network domain
func (d Database) StorePND(pnd *PND) neo4j.Node {
	session := createSession(d.driver, true)
	defer session.Close()

	result, err := session.WriteTransaction(storePndTxFunc(pnd.name, pnd.description, pnd.interfaces))
	if err != nil {
		log.Info(err)
	}

	log.Info("created/updated PND with id: ", result.(neo4j.Node).Id())
	return result.(neo4j.Node)
}

//RemovePND removes the given principle network domain by id.
func (d Database) RemovePND(id string) {}

//GetPNDByID gets a specific PND by the given ID.
func (d Database) GetPNDByID(id string) {}

//GetNodesByLabel gets all nodes that belong to a specific label.
func (d Database) GetNodesByLabel(label string) {}

//GetNodeByID gets a specific node by ID.
func (d Database) GetNodeByID(id string) {}

//storeNodesTxFunc transaction to store devices from a json.
//relates them to a specific pnd id.
//returns a slice of added devices
func storeNodesTxFunc(json string, id int64) neo4j.TransactionWork {
	return func(tx neo4j.Transaction) (interface{}, error) {
		var nodelist []neo4j.Node
		query :=
			`
			WITH apoc.convert.fromJsonMap($stringToAdd)
			AS value
			UNWIND value.data as d
			MERGE (device:Device {id: d.object_id})
			ON CREATE SET device.nativeName = d.object_data.` + "`tapi-object-data`.name[0].value," + `
				device.deviceType = d.object_data.` + "`tapi-object-data`.name[1].value," + `
				device.serialNumber = d.object_data.` + "`tapi-object-data`.name[2].value," + `
				device.softwareVersion = d.object_data.` + "`tapi-object-data`.name[3].value," + `
				device.` + "`operational-state` = d.object_data.`tapi-object-data`.`operational-state`" + `
			WITH device
			MATCH (pnd:PND)
			WHERE id(pnd) = $pnd
			MERGE (device)-[:BELONGS_TO]->(pnd)
			RETURN device
			`

		result, err := tx.Run(query, map[string]interface{}{
			"stringToAdd": json,
			"pnd":         id,
		})

		if err != nil {
			//TODO: handle neo4j.isServiceUnavailable()
			return nil, err
		}

		for result.Next() {
			nodelist = append(nodelist, result.Record().GetByIndex(0).(neo4j.Node))
		}

		if err = result.Err(); err != nil {
			return nil, err
		}

		return nodelist, nil
	}
}

//StoreNodes stores the given nodes to the database and adds them to a
//principle networt domain (PND). It is required for a node to belong to a PND.
func (d Database) StoreNodes(json string) []neo4j.Node {
	//TODO: remove this after testing and add own gRPC call for it
	testPND := PND{name: "test_PND", description: "very interesting", interfaces: []string{"TAPI", "RESTCONF"}}
	pnd := d.StorePND(&testPND).Id()

	session := createSession(d.driver, true)
	defer session.Close()

	result, err := session.WriteTransaction(storeNodesTxFunc(json, pnd))
	if err != nil {
		log.Info(err)
	}

	log.Info("added/updated devices (count): ", len(result.([]neo4j.Node)))
	return result.([]neo4j.Node)
}

//RemoveNodes removes the given nodes and their relationships
func (d Database) RemoveNodes(json string) {}

//RemoveSingleNode removes the given node and their relationship by id.
func (d Database) RemoveSingleNode(id string) {}

//storeLinksTxFunc transaction to store links from a json.
//creates relation between different devices.
//returns a slice of those created relations.
func storeLinksTxFunc(json string) neo4j.TransactionWork {
	return func(tx neo4j.Transaction) (interface{}, error) {
		var relationsList []neo4j.Relationship
		query :=
			`
			WITH apoc.convert.fromJsonMap($stringToAdd)
			AS value
			UNWIND value.data as l
			MATCH (d:Device), (d2:Device)
			WHERE d.id = l.object_data.` + "`tapi-object-data`.`node-edge-point`[0].`node-uuid`" + `
			AND d2.id = l.object_data.` + "`tapi-object-data`.`node-edge-point`[1].`node-uuid`" + `
			CALL apoc.merge.relationship(d,l.object_data.` + "`tapi-object-data`.`layer-qualifier`,{},{}, d2,{})" + `
			YIELD rel
			RETURN rel
			`

		result, err := tx.Run(query, map[string]interface{}{
			"stringToAdd": json,
		})

		if err != nil {
			//TODO: handle neo4j.isServiceUnavailable()
			return nil, err
		}

		for result.Next() {
			relationsList = append(relationsList, result.Record().GetByIndex(0).(neo4j.Relationship))
		}

		if err = result.Err(); err != nil {
			return nil, err
		}

		return relationsList, nil
	}
}

//StoreLinks stores the links between nodes
func (d Database) StoreLinks(json string) []neo4j.Relationship {
	session := createSession(d.driver, true)
	defer session.Close()

	result, err := session.WriteTransaction(storeLinksTxFunc(json))
	if err != nil {
		log.Info(err)
	}

	log.Info("added/updated links (count): ", len(result.([]neo4j.Relationship)))

	return result.([]neo4j.Relationship)
}

//storeNodeEdgePointsTxFunc transaction to store interfaces from a json.
//returns count of added/updated interfaces
func storeNodeEdgePointsTxFunc(json string) neo4j.TransactionWork {
	return func(tx neo4j.Transaction) (interface{}, error) {
		query :=
			`
			WITH apoc.convert.fromJsonMap($stringToAdd)
			AS value
			UNWIND value.data as i
			MERGE (interface:Interface {id: i.object_id})
			ON CREATE SET interface.object_type =i.object_type,
			interface.localId = i.object_data.` + "`tapi-object-data`.name[0].value," + `
			interface.location = i.object_data.` + "`tapi-object-data`.name[1].value," + `
			interface.` + "`containing-node` = i.object_data.`tapi-object-data`.`containing-node`" + `
			RETURN count(interface)
			`

		result, err := tx.Run(query, map[string]interface{}{
			"stringToAdd": json,
		})

		if err != nil {
			//TODO: handle neo4j.isServiceUnavailable()
			return nil, err
		}

		if result.Next() {
			return result.Record().GetByIndex(0), nil
		}

		return nil, errors.New("expected a record")
	}
}

//TODO: currently this goes over each and every device/interface and adds
//		a interface_of relation. -> do it only for the newly added interfaces

//storeNodeEdgePointsRelationTxFunc transaction to create relations between interfaces and devices
//returns count of added/updated relations
func storeNodeEdgePointsRelationTxFunc() neo4j.TransactionWork {
	return func(tx neo4j.Transaction) (interface{}, error) {
		query :=
			`
			MATCH (d:Device), (i:Interface)
			WHERE d.id = i.` + "`containing-node`" + `
			MERGE (i)-[r:INTERFACE_OF]->(d)
			RETURN count(r)
			`

		result, err := tx.Run(query, nil)

		if err != nil {
			//TODO: handle neo4j.isServiceUnavailable()
			return nil, err
		}

		if result.Next() {
			return result.Record().GetByIndex(0), nil
		}

		return nil, errors.New("expected a record")
	}
}

//StoreNodeEdgePoints stores the given node edge points (interfaces)
func (d Database) StoreNodeEdgePoints(json string) {
	session := createSession(d.driver, true)
	defer session.Close()

	result, err := session.WriteTransaction(storeNodeEdgePointsTxFunc(json))
	if err != nil {
		log.Info(err)
	}

	_, err = session.WriteTransaction(storeNodeEdgePointsRelationTxFunc())
	if err != nil {
		log.Info(err)
	}

	log.Info("added/updated nodeEdgePoints (count): ", result)
}

//StoreConnections stores relations between nodes
func (d Database) StoreConnections(json string) {}

//StoreTopology creates a new network topology node. Can also create a relation
//the new node and a existing one if desired
func StoreTopology() {}

//RemoveTopology removes the given network topology. This includes the node itself
//aswell as the containing links and relations
func RemoveTopology() {}

//CreateTopologyRelation creates a relation between two given topologies
func CreateTopologyRelation() {}

//CreateLink creates a link between two network elements
func CreateLink() {}

//RemoveLink removes a link between two network elements
func RemoveLink() {}

//Shutdown closes the connection to the database
func (d Database) Shutdown() error {
	return d.driver.Close()
}