-
Johannes Hitzinger authoredJohannes Hitzinger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main.py 2.59 KiB
import asyncio
import dataclasses
import ipaddress
import sys
import random
from contextlib import asynccontextmanager
import time
import jsonpickle
import numpy
import uvicorn
import logging
from fastapi import FastAPI, Path, APIRouter
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from pydantic import BaseModel
from typing import Annotated
import dilithium.dilithium
from dilithium import Dilithium
from pint import UnitRegistry
import settings
import participant
import DHTdummy
ureg = UnitRegistry()
Q_ = ureg.Quantity
# get ip of this device/container
import socket
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
# --------------- Examples -----------------------
'''
v1 = Q_(5, ureg.volt)
a1 = Q_(4, ureg.amp)
p1 = v1 * a1
p1.ito(ureg.watt)
#print(p1)
d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max),
random.randint(0, numpy.iinfo('uint32').max),
random.randint(0, numpy.iinfo('uint32').max)])
msg = b"this is a message"
sig = d1.sign_with_input(sk, msg)
result = d1.verify(pk, msg, sig)
#print(result)
'''
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger('uvicorn')
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
# start the server
'''
if __name__ == "__main__":
@scheduler.scheduled_job('interval', seconds=100)
def scheduled_job_1():
logger.info("hello from " + get_ip())
part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address(
'172.20.0.2')}, con_ip=ipaddress.IPv4Address('172.20.1.1')) # Network_ID should be a Hash for real world use, provide ip adresses of dht endpoints for first discovery
#scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
fast_app.include_router(part.fastRouter)
uvicorn.run(fast_app, host=get_ip(), port=8000)
'''