Skip to content
Snippets Groups Projects
Commit 546c9329 authored by Alfred Ashur Oshana's avatar Alfred Ashur Oshana
Browse files

new shared crud and schemas and libs

parent d8e3ae58
No related branches found
No related tags found
No related merge requests found
Showing with 975 additions and 2 deletions
import logging as log
from typing import List
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pydantic import BaseModel
from sqlalchemy.orm import Session
from pilab.crud.Util import from_json, update_attrs
from pilab.schemas.Cube import CubesType
from pilab.schemas.Pi import PiType
from pilab.crud.Hardware import Controller, Switch
from pilab.crud.Pi import Pi
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Cube(object):
@staticmethod#DONE
def get(db: Session, cube_id: int = None, switch_id: int = None, controller_id : int = None):
if cube_id:
db_cube = db.query(CubesType).filter(CubesType.id == cube_id).first()
elif switch_id:
db_cube = db.query(CubesType).filter(CubesType.switch_id == switch_id).first()
elif controller_id:
db_cube = db.query(CubesType).filter(CubesType.controller_id == controller_id).first()
controller = Controller.get(db, host_id = db_cube.controller_id) if db_cube.controller_id else None
switch = Switch.get(db, host_id = db_cube.switch_id)
pis = Pi.getAll(db, cube_id = db_cube.id)
return meta.Cube(
id=db_cube.id,
controller=controller,
switch=switch,
pis=pis,
)
@staticmethod#DONE
def create(db: Session, cube: meta.Cube):
if cube.controller:
controller_id = Controller.create(db, cube.controller).id
else:
controller_id = None
switch_id = Switch.create(db, cube.switch).id
db_cube = CubesType(
id=cube.id,
controller_id=controller_id,
switch_id=switch_id,
)
db.add(db_cube)
db.flush()
for pi in cube.pis:
Pi.create(db, pi)
return db_cube.id
@staticmethod#DONE
def delete(db: Session, cube_id: int):
db_cube = db.query(CubesType).filter(CubesType.id == cube_id).first()
db_pis = db.query(PiType).filter(
PiType.cube_id == cube_id).all()
for db_pi in db_pis:
Pi.delete(db, db_pi.host_id)
db.delete(db_cube)
if db_cube.controller_id:
Controller.delete(db, db_cube.controller_id)
Switch.delete(db, db_cube.switch_id)
db.flush()
@staticmethod
def get_ids(db: Session):
ids = []
db_ids = db.query(CubesType.id).all()
for i, in db_ids:
ids.append(i)
return ids
@staticmethod
def is_valid_id(db: Session, cube_id: int):
cube = db.query(CubesType).filter(
CubesType.id == cube_id).first()
return True if cube else False
@staticmethod
def get_all(db: Session):
ids = Cube.get_ids(db)
cubes = []
for i in ids:
cube = Cube.get(db, i)
cubes.append(cube)
return cubes
import logging as log
from sqlalchemy.orm import Session
from pilab.crud.Util import from_json, update_attrs
from pilab.crud.Host import Host
from pilab.schemas.Cube import CubesType
from pilab.schemas.Pi import PiType
from pilab.schemas.Hardware import SwitchType, ControllerType
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Switch(object):
@staticmethod
def create(db: Session, switch: meta.Switch):#DONE
host = Host.create(db, switch)
db_switch = from_json(SwitchType, switch.dict(), host_id=host.id)
db.add(db_switch)
db.flush()
switch = meta.Switch(**vars(db_switch), **host.dict())
return switch
@staticmethod
def get(db: Session, host_id: int = None, cube_id: int = None, pi_id: int = None):
if host_id:
db_switch = db.query(SwitchType).filter(
SwitchType.host_id == host_id
).first()
elif cube_id:
db_cube = db.query(CubesType).filter(
CubesType.switch_id == host_id
).first()
db_switch = db.query(SwitchType).filter(
SwitchType.host_id == db_cube.switch_id
).first()
elif pi_id:
db_pi= db.query(PiType).filter(
PiType.host_id == pi_id
).first()
db_cube = db.query(CubesType).filter(
CubesType.id == db_pi.cube_id
).first()
db_switch = db.query(SwitchType).filter(
SwitchType.host_id == db_cube.switch_id
).first()
host = Host.get(db, db_switch.host_id)
return meta.Switch(**vars(db_switch), **host.dict())
@staticmethod#DONE
def delete(db: Session, host_id: int):
db_switch = db.query(SwitchType).filter(
SwitchType.host_id == host_id
).first()
db.delete(db_switch)
db.flush()
Host.delete(db, host_id)
@staticmethod
def update(db: Session, switch: meta.HostUpdate, _id: int):
host = Host.update(db, switch, _id)
return host
@staticmethod
def getAll(db: Session):
switches = []
db_switches = db.query(SwitchType).all()
for db_switch in db_switches:
host = Host.get(db, db_switch.host_id)
switches.append(meta.Switch(**vars(db_switch), **host.dict()))
return switches
class Controller(object):
@staticmethod
def create(db: Session, controller: meta.Controller):
host = Host.create(db, controller)
db_controller = from_json(ControllerType, controller.dict(), host_id=host.id)
db.add(db_controller)
db.flush()
controller = meta.Controller(**vars(db_controller), **host.dict())
return controller
@staticmethod#DONE
def delete(db: Session, host_id: int):
db_controller = db.query(ControllerType).filter(
ControllerType.host_id == host_id
).first()
db.delete(db_controller)
db.flush()
Host.delete(db, host_id)
@staticmethod
def get(db: Session, host_id: int = None, cube_id: int = None, pi_id: int = None):
if host_id:
db_controller = db.query(ControllerType).filter(
ControllerType.host_id == host_id
).first()
elif cube_id:
db_cube = db.query(CubesType).filter(
CubesType.controller_id == host_id
).first()
db_controller = db.query(ControllerType).filter(
ControllerType.host_id == db_cube.controller_id
).first()
elif pi_id:
db_pi= db.query(PiType).filter(
PiType.host_id == pi_id
).first()
db_cube = db.query(CubesType).filter(
CubesType.id == db_pi.cube_id
).first()
db_controller= db.query(ControllerType).filter(
ControllerType.host_id == db_cube.controller_id
).first()
host = Host.get(db, db_controller.host_id)
return meta.Controller(**vars(db_controller), **host.dict())
@staticmethod
def update(db: Session, controller: meta.HostUpdate, _id: int):
host = Host.update(db, controller, _id)
return host
@staticmethod
def getAll(db: Session):
controllers = []
db_controllers = db.query(ControllerType).all()
for db_controller in db_controllers:
host = Host.get(db, db_controller.host_id)
controllers.append(meta.Controller(**vars(db_controller), **host.dict()))
return controllers
\ No newline at end of file
import logging as log
from typing import List
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pydantic import BaseModel
from sqlalchemy import inspect
from sqlalchemy.orm import Session, class_mapper
from pilab.crud.Util import from_json, update_attrs
from pilab.schemas.Host import HostType
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Host(object):
@staticmethod
def create(db: Session, host: meta.Host):#DONE
db_host = from_json(HostType, host.dict())
db.add(db_host)
db.flush()
resp_host = meta.Host(**vars(db_host))
return resp_host
@staticmethod
def update(db: Session, host: meta.HostUpdate, host_id: int):
db_host = db.query(HostType).filter(
HostType.id == host_id).first()
update_attrs(host, db_host)
db.flush()
resp_host = meta.Host(**vars(db_host))
return resp_host
@staticmethod
def get(db: Session, id: int):
db_host = db.query(HostType).filter(
HostType.id == id).first()
return meta.Host(**vars(db_host))
@staticmethod
def delete(db: Session, host_id: int):#DONE
db_host = db.query(HostType).filter(
HostType.id == host_id).first()
db.delete(db_host)
db.flush()
return
@staticmethod
def is_valid_id(db: Session, host_id: int):
pi = db.query(HostType).filter(
HostType.host_id == host_id).first()
return True if pi else False
\ No newline at end of file
import logging as log
from typing import List
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pydantic import BaseModel
from sqlalchemy import inspect
from sqlalchemy.orm import Session, class_mapper
from pilab.crud.Util import from_json, update_attrs
from pilab.schemas.Image import ImageType, ScriptType, UserDataType
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Image(object):
@staticmethod
def create(db: Session, **kwargs):
db_image = from_json(ImageType, kwargs, change=datetime.now(timezone.utc))
db.add(db_image)
db.flush()
return db_image.id
@staticmethod
def delete(db: Session, image_id: int):
db_image = db.query(ImageType).filter(
ImageType.id == image_id
).first()
if db_image:
db.delete(db_image)
db.flush()
@staticmethod
def get(db: Session, _id: int = None, name: str = None, version: str = None, username: str = None):
"""Returns images from the postgres database. List or single image.
filter by id to get specific image.
filter by name to get all images with name
filter by name and version to get specific image. Version without name will be omitted.
filter by username to get all public images and user specific images. Get all images if None
"""
def process_images(data):
if isinstance(data, list):
images = []
for image in data or []:
image.change = image.change.replace(tzinfo=ZoneInfo('UTC'))
if username is None or image.public or image.owner == username:
images.append(meta.Image(
**vars(image)))
return images
else:
image = data
if image:
image.change = image.change.replace(tzinfo=ZoneInfo('UTC'))
return meta.Image(
**vars(image)) if username is None or image.public or data.owner == username else None
else:
return None
if _id:
db_image = db.query(ImageType).filter(ImageType.id == _id).first()
return process_images(db_image)
elif name and not version:
db_images = db.query(ImageType).filter(ImageType.name == name).all()
return process_images(db_images)
elif name and version:
db_image = db.query(ImageType).filter(
ImageType.name == name,
ImageType.version == version
).first()
return process_images(db_image)
else:
db_images = db.query(ImageType).all()
return process_images(db_images)
@staticmethod
def update_size(db: Session, image_id: int, size: int):
db_image = db.query(ImageType).filter(
ImageType.id == image_id
).first()
db_image.size = size
db_image.change = datetime.now(timezone.utc)
db.flush()
return
@staticmethod
def update_script(db: Session, image_id: int, script_id: int):
db_image = db.query(ImageType).filter(
ImageType.id == image_id
).first()
db_image.script_id = script_id
db_image.change = datetime.now(timezone.utc)
db.flush()
return
@staticmethod
def get_id(db: Session, name: str, version: str):
db_image = db.query(ImageType).filter(
ImageType.name == name,
ImageType.version == version
).first()
if db_image:
return db_image.id
else:
return None
@staticmethod
def is_valid_id(db: Session, image_id: int):
cube = db.query(ImageType).filter(
ImageType.id == image_id).first()
return True if cube else False
class UserData(object):
@staticmethod
def create(db: Session, data: str, owner: str):
db_data = UserDataType(
data=data,
owner=owner
)
db.add(db_data)
db.flush()
return meta.UserData(**vars(db_data))
@staticmethod
def update(db: Session, data: str, _id: int):
db_data = db.query(UserDataType).filter(
UserDataType.id == _id).first()
if db_data:
db_data.data = data
db.flush()
return meta.UserData(**vars(db_data))
@staticmethod
def delete(db: Session, _id: int):
db_data = db.query(UserDataType).filter(UserDataType.id == _id).first()
if db_data:
db.delete(db_data)
db.flush()
@staticmethod
def get(db: Session, _id: int = None, owner: str = None):
if _id:
db_data = db.query(UserDataType).filter(UserDataType.id == _id).first()
return meta.UserData(**vars(db_data))
elif owner:
db_datas = db.query(UserDataType).filter(UserDataType.owner == owner).all()
return [meta.UserData(**vars(data)) for data in db_datas]
else:
db_datas = db.query(UserDataType).all()
return [meta.UserData(**vars(data)) for data in db_datas]
@staticmethod
def is_valid_id(db: Session, _id: int):
db_data = db.query(UserDataType).filter(
UserDataType.id == _id).first()
return True if db_data else False
class Script(object):
@staticmethod
def create(db: Session, script: str, script_chroot: str, owner: str, name: str, read_only: bool):
db_script = ScriptType(
owner=owner,
name=name,
read_only=read_only,
script=script,
script_chroot=script_chroot
)
db.add(db_script)
db.flush()
return meta.Script(**vars(db_script))
@staticmethod
def update(db: Session, script_id: int, script: str = None, script_chroot: str = None, name: str = None):
db_script = db.query(ScriptType).filter(ScriptType.id == script_id).first()
if script:
db_script.script = script
if script_chroot:
db_script.script_chroot = script_chroot
if name:
db_script.name = name
db.flush()
return meta.Script(**vars(db_script))
@staticmethod
def delete(db: Session, script_id: int):
db_script = db.query(ScriptType).filter(
ScriptType.id == script_id
).first()
if db_script:
db.delete(db_script)
db.flush()
@staticmethod
def get(db: Session, script_id: int = None, name: str = None):
if script_id:
db_script = db.query(ScriptType).filter(ScriptType.id == script_id).first()
return meta.Script(**vars(db_script)) if db_script else None
elif name:
db_script = db.query(ScriptType).filter(ScriptType.name == name).first()
return meta.Script(**vars(db_script)) if db_script else None
else:
db_scripts = db.query(ScriptType).all()
return [meta.Script(**vars(p)) for p in db_scripts]
@staticmethod
def is_valid_id(db: Session, script_id: int):
script = db.query(ScriptType).filter(
ScriptType.id == script_id).first()
return True if script else False
\ No newline at end of file
import logging as log
from typing import List
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pydantic import BaseModel
from sqlalchemy import inspect
from sqlalchemy.orm import Session, class_mapper
from pilab.crud.Util import from_json, update_attrs
from pilab.schemas.Key import KeyType
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Key(object):
@staticmethod
def create(db: Session, **kwargs):
db_key = from_json(KeyType, kwargs)
db.add(db_key)
db.flush()
return db_key.id
@staticmethod
def delete(db: Session, key_id: str):#DONE
db_key = db.query(KeyType).filter(
KeyType.id == key_id
).first()
db.delete(db_key)
db.flush()
return
@staticmethod
def get(db: Session, id: str):#DONE
db_key = db.query(KeyType).filter(KeyType.id == id).first()
return meta.Key(**vars(db_key))
@staticmethod
def getAll(db: Session, owner: str = None):#DONE
if owner:
db_keys = db.query(KeyType).filter(KeyType.owner == owner).all()
else:
db_keys = db.query(KeyType).all()
keys = []
for db_key in db_keys:
key = meta.Key(**vars(db_key))
keys.append(key)
return keys
\ No newline at end of file
import logging as log
from typing import List
from sqlalchemy.orm import Session
from pilab.crud.Util import from_json, update_attrs
from pilab.crud.Host import Host
from pilab.crud.Pi import Pi
from pilab.schemas.Host import HostType
from pilab.schemas.Pi import PiType
from pilab.schemas.Host import HostType
from pilab.schemas.Cube import CubesType
from pilab.events import meta
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
class Pi(object):
@staticmethod
def create(db: Session, pi: meta.Pi):#DONE
host = Host.create(db, pi)
db_pi = from_json(PiType, pi.dict(), host_id=host.id)
db.add(db_pi)
db.flush()
pi = meta.Pi(**vars(db_pi), **host.dict())
return pi
@staticmethod
def update(db: Session, pi: meta.PiUpdate, pi_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
host = Host.update(db, pi, db_pi.host_id)
update_attrs(pi, db_pi)
db.flush()
pi = meta.Pi(**vars(db_pi), **host.dict())
return pi
@staticmethod
def update_image(db: Session, pi_id: int, image_id: int, user_data_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
db_pi.image_id = image_id
db_pi.user_data_id = user_data_id
db.flush()
@staticmethod
def update_playbook(db: Session, pi_id: int, playbook_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
db_pi.playbook_id = playbook_id
db.flush()
@staticmethod
def assign_image(db: Session, pi_id: int, image_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
db_pi.image_id = image_id
db.flush()
@staticmethod
def assign_host_key(db: Session, pi_id: int, key_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
db_pi.key_id = key_id
db.flush()
@staticmethod
def assign_data(db: Session, pi_id: int, user_data_id: int):
db_pi = db.query(PiType).filter(
PiType.host_id == pi_id).first()
db_pi.user_data_id = user_data_id
db.flush()
@staticmethod
def get(db: Session, id: int = None, serial : bytes = None, mac: int = None):#DONE
if id:
db_pi = db.query(PiType).filter(PiType.host_id == id).first()
elif serial:
db_pi = db.query(PiType).filter(PiType.serial == serial).first()
elif mac:
db_host = db.query(HostType).filter(HostType.mac == mac).first()
db_pi = db.query(PiType).filter(PiType.host_id == db_host.id).first()
host = Host.get(db, db_pi.host_id)
return meta.Pi(**vars(db_pi), **host.dict())
@staticmethod
def delete(db: Session, host_id: int):#DONE
db_pi = db.query(PiType).filter(
PiType.host_id == host_id
).first()
db.delete(db_pi)
db.flush()
Host.delete(db, db_pi.host_id)
return
@staticmethod
def is_valid_id(db: Session, host_id: int):
pi = db.query(PiType).filter(
PiType.host_id == host_id).first()
return True if pi else False
@staticmethod
def get_cube_id(db: Session, pi_id: int):
pi = Pi.get(db, pi_id)
cube_id = db.query(CubesType.id).filter(
CubesType.id == pi.cube_id).first()
return cube_id
@staticmethod
def getAll(db: Session, _ids: List[int] = None, cube_id: int = None, switch_id: int = None, controller_id: int = None, image_id: int= None):#DONE
if _ids:
ids = db.query(PiType.host_id).filter(PiType.host_id.in_(_ids)).all()
if len(ids) != len(_ids):
invalid_ids = set(ids) - {pi.id for pi in ids}
raise ValueError(f"Invalid ID(s) in the list: {invalid_ids}")
elif cube_id:
ids = db.query(PiType.host_id).filter(
PiType.cube_id == cube_id).all()
elif switch_id:
db_cube = db.query(CubesType).filter(
CubesType.switch_id == switch_id
).first()
ids = db.query(PiType.host_id).filter(
PiType.cube_id == db_cube.id
).all()
elif controller_id:
db_cube = db.query(CubesType).filter(
CubesType.controller_id == controller_id
).first()
ids = db.query(PiType.host_id).filter(
PiType.cube_id == db_cube.id
).all()
elif image_id:
ids = db.query(PiType.host_id).filter(PiType.image_id == image_id).all()
else:
ids = db.query(PiType.host_id).all()
pis = []
for i, in ids:
pi = Pi.get(db, i)
pis.append(pi)
return pis
@staticmethod
def get_ids(db: Session):#DONE
ids_tuple = db.query(PiType.host_id).all()
ids = []
for id, in ids_tuple:
ids.append(id)
return ids
\ No newline at end of file
import logging as log
from pydantic import BaseModel
from sqlalchemy import inspect
from sqlalchemy.orm import class_mapper
from pilab.schemas.Util import OffsetType
from sqlalchemy.orm import Session
log.getLogger('sqlalchemy.engine').setLevel(log.WARNING)
def from_json(model, data, **kwargs):
mapper = class_mapper(model)
keys = mapper.attrs.keys()
relationships = inspect(mapper).relationships
args = {k: v for k, v in data.items()
if k in keys and k not in relationships}
return model(**args, **kwargs)
def update_attrs(data: BaseModel, db_data):
for k, v in data.dict().items():
mapper = class_mapper(type(db_data))
if v is not None and k in mapper.attrs.keys():
setattr(db_data, k, v)
class Offset(object):
@staticmethod
def set(db: Session, stream_name: str, offset: int):
db_offset = db.query(OffsetType).filter(
OffsetType.stream_name == stream_name).first()
if not db_offset:
db_offset = OffsetType(stream_name=stream_name, stream_offset=offset)
db.add(db_offset)
else:
db_offset.stream_offset = offset
db.flush()
@staticmethod
def get(db: Session, stream_name:str):
db_offset = db.query(OffsetType).filter(
OffsetType.stream_name == stream_name).first()
return db_offset.stream_offset if db_offset else None
\ No newline at end of file
import binascii
import os
import socket
import struct
import logging
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy import types, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import ipaddress
logger = logging.getLogger(__name__)
DB_ADDRESS = os.getenv('DB_ADDRESS', '127.0.0.1')
DB_DATABASE = os.getenv('DB_DATABASE')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_USER = os.getenv('DB_USER')
SQLALCHEMY_DATABASE_URL = 'postgresql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_ADDRESS + '/' + DB_DATABASE
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
@contextmanager
def get_db_context():
"""Provide a transactional scope around a series of operations."""
db = SessionLocal()
try:
yield db
db.commit()
except:
db.rollback()
raise
finally:
db.close()
def get_db():
with get_db_context() as db:
yield db
def init_database():
try:
with get_db_context() as db:
with open("./init.sql") as file:
query = text(file.read())
db.execute(query)
except Exception as e:
logger.error(f"Failed to initialize database schema; message: {e}")
raise
class MAC(types.TypeDecorator):
impl = types.VARBINARY
def process_result_value(self, value, dialect):
if value is None:
return value
return binascii.hexlify(value, ':')
def process_bind_param(self, value, dialect):
if value is None:
return value
return binascii.unhexlify(value.replace(':', ''))
class SERIAL(types.TypeDecorator):
impl = types.VARBINARY
def process_result_value(self, value, dialect):
if value is None:
return value
r = bytes(value.hex(), 'utf-8')
return r
def process_bind_param(self, value, dialect):
if value is None:
return value
return bytes.fromhex(value.decode())
class IP(types.TypeDecorator):
impl = types.BIGINT
def process_result_value(self, value, dialect):
if value is None:
return value
return ipaddress.IPv4Address(socket.inet_ntoa(struct.pack('!L', value)))
def process_bind_param(self, value, dialect):
if value is None:
return value
packed_ip = socket.inet_aton(str(value))
return struct.unpack("!L", packed_ip)[0]
......@@ -53,7 +53,7 @@ class HostUpdate(Host):
__annotations__ = convert_to_optional(Host)
class Pi(Host):
class Pi_old(Host):
"""
representation of pi object with host id
"""
......@@ -61,6 +61,17 @@ class Pi(Host):
position: int
ssh_host_ed25519_key: Optional[str]
class Pi(Host):
"""
representation of pi object with host id
"""
serial: bytes
position: int
key_id: Optional[int]
image_id: Optional[int]
user_data_id: Optional[int]
playbook_id: Optional[int]
cube_id: int
class PiUpdate(HostUpdate):
"""
......@@ -78,7 +89,46 @@ class Cube(BaseModel):
switch: Switch
pis: List[Pi]
class UserData(BaseModel):
id: int
owner: str
data: str
class Image(BaseModel):
id: int
name: str
version: str
owner: Optional[str]
size: Optional[int]
public: bool
cloud_init: bool
script_id: Optional[int]
change: datetime
class Script(BaseModel):
id: int
name: str
owner: str
read_only: bool
script: str
script_chroot: str
class Key(BaseModel):
"""
representation of key object
"""
id: int
host_key: str
owner: str
class KeyBinding(BaseModel):
"""
representation of key_binding object
"""
pi_id: int
key_id: str
class EventType(str, Enum):
CREATE = 'create'
UPDATE = 'update'
......
from sqlalchemy import Column, Integer, ForeignKey
from pilab.database import Base, MAC, SERIAL, IP
class CubesType(Base):
__tablename__ = "cubes"
id = Column(Integer, primary_key=True)
switch_id = Column(Integer, ForeignKey('hosts.id'))
controller_id = Column(Integer, ForeignKey('hosts.id'))
\ No newline at end of file
from sqlalchemy import Column, Integer, ForeignKey, Boolean
from pilab.database import Base
class SwitchType(Base):
__tablename__ = "switches"
host_id = Column(Integer, ForeignKey('hosts.id'), primary_key=True)
poe = Column(Boolean)
class ControllerType(Base):
__tablename__ = "controllers"
host_id = Column(Integer, ForeignKey('hosts.id'), primary_key=True)
\ No newline at end of file
from sqlalchemy import Column, Integer, ForeignKey, String, Boolean, BigInteger, DateTime
from pilab.database import Base, MAC, SERIAL, IP
class HostType(Base):
__tablename__ = "hosts"
id = Column(Integer, primary_key=True)
mac = Column(MAC())
hostname = Column(String)
ipv4_address = Column(IP())
model = Column(String)
\ No newline at end of file
from sqlalchemy import Column, Integer, ForeignKey, String, Boolean, BigInteger, DateTime
from pilab.database import Base, MAC, SERIAL, IP
class ScriptType(Base):
__tablename__ = "scripts"
id = Column(Integer, primary_key=True)
name = Column(String)
owner = Column(String)
read_only = Column(Boolean)
script = Column(String)
script_chroot = Column(String)
class ImageType(Base):
__tablename__ = "images"
id = Column(Integer, primary_key=True)
name = Column(String)
version = Column(String)
owner = Column(String)
size = Column(Integer)
public = Column(Boolean)
cloud_init = Column(Boolean)
change = Column(DateTime)
script_id = Column(Integer, ForeignKey('scripts.id'))
class UserDataType(Base):
__tablename__ = "user_data"
id = Column(Integer, primary_key=True)
owner = Column(String)
data = Column(String)
\ No newline at end of file
from sqlalchemy import Column, Integer, String
from pilab.database import Base, MAC, SERIAL, IP
class KeyType(Base):
__tablename__ = "keys"
id = Column(Integer, primary_key=True)
owner = Column(String)
host_key = Column(String)
from sqlalchemy import Column, Integer, ForeignKey
from pilab.database import Base, MAC, SERIAL, IP
class PiType(Base):
__tablename__ = "pis"
host_id = Column(Integer, ForeignKey('hosts.id'), primary_key=True)
cube_id = Column(Integer, ForeignKey('cubes.id'), primary_key=True)
serial = Column(SERIAL())
position = Column(Integer)
user_data_id = Column(Integer, ForeignKey('user_data.id'))
image_id = Column(Integer, ForeignKey('images.id'))
playbook_id = Column(Integer)
key_id = Column(Integer, ForeignKey('keys.id'))
\ No newline at end of file
from sqlalchemy import Column, String, BigInteger
from pilab.database import Base
class OffsetType(Base):
__tablename__ = "stream_offsets"
stream_name = Column(String, primary_key=True)
stream_offset = Column(BigInteger)
\ No newline at end of file
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='pilab',
version='4.3.1',
version='5.0.0',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment