Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main.py 2.59 KiB
import asyncio
import dataclasses
import ipaddress
import sys
import random
from contextlib import asynccontextmanager
import time

import jsonpickle
import numpy
import uvicorn
import logging
from fastapi import FastAPI, Path, APIRouter
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore

from pydantic import BaseModel
from typing import Annotated
import dilithium.dilithium
from dilithium import Dilithium

from pint import UnitRegistry

import settings
import participant
import DHTdummy

ureg = UnitRegistry()
Q_ = ureg.Quantity

# get ip of this device/container
import socket


def get_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(0)
    try:
        # doesn't even have to be reachable
        s.connect(('10.254.254.254', 1))
        IP = s.getsockname()[0]
    except Exception:
        IP = '127.0.0.1'
    finally:
        s.close()
    return IP


# --------------- Examples -----------------------


'''
v1 = Q_(5, ureg.volt)
a1 = Q_(4, ureg.amp)
p1 = v1 * a1
p1.ito(ureg.watt)
#print(p1)

d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max),
                    random.randint(0, numpy.iinfo('uint32').max),
                    random.randint(0, numpy.iinfo('uint32').max)])
msg = b"this is a message"
sig = d1.sign_with_input(sk, msg)
result = d1.verify(pk, msg, sig)
#print(result)
'''
jobstores = {
    'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
    level=logging.INFO,
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('uvicorn')


@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.shutdown()


fast_app = FastAPI(lifespan=lifespan)

# start the server
'''
if __name__ == "__main__":
    @scheduler.scheduled_job('interval', seconds=100)
    def scheduled_job_1():
        logger.info("hello from " + get_ip())


    part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address(
        '172.20.0.2')}, con_ip=ipaddress.IPv4Address('172.20.1.1'))  # Network_ID should be a Hash for real world use, provide ip adresses of dht endpoints for first discovery
    #scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
    fast_app.include_router(part.fastRouter)
    uvicorn.run(fast_app, host=get_ip(), port=8000)
'''