-
Johannes Hitzinger authored
started with trust chain, its a mess
Johannes Hitzinger authoredstarted with trust chain, its a mess
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
line.py 5.31 KiB
import datetime
import ipaddress
import time
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager, redirect_stdout
from fastapi import APIRouter, FastAPI, Body
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from util import get_pub_ip
import logging
from connection import Connection
from remotetransformer import RemoteTransformer
from remoteparticipant import RemoteParticipant
from remoteline import RemoteLine, SignedRemoteLine
logger = logging.getLogger(__name__)
class Line(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
self.lastSignedLine: dict[float, SignedRemoteLine] = dict()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
async def retrieveConnections(self):
result: set[RemoteTransformer | RemoteParticipant] = set()
for ip in self.remoteIPs:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
result.add(jsonpickle.decode(response.json()))
self.adjacentConnections.update(result)
# logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
async def asRemoteLineJSON(self):
rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss)
return jsonpickle.encode(rl)
async def sign(self, extime: float, body=Body()):
rl = jsonpickle.decode(body["remote"])
logger.info("===> Line: " + self.publicIP.__str__() + " received sign request for " + rl.publicIP.__str__())
if rl.publicKey == self.publicKey: # check if the rl actually is me
if self.checkCapacity(rl):
if extime not in self.lastSignedLine.keys():
origin = SignedRemoteLine(
RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss), None)
origin.isOrigin = True
with redirect_stdout(None):
origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode())
result = SignedRemoteLine(rl, origin)
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())
# logger.info("------------------->" + rl.__repr__().encode().__str__())
self.lastSignedLine[extime] = result
logger.info("===> Line: " + self.publicIP.__str__() + " signed first " + rl.publicIP.__str__())
return jsonpickle.encode(result)
else:
# if there has been a route announced before, add the previous to the new one
result = SignedRemoteLine(rl, self.lastSignedLine[extime])
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())
self.lastSignedLine[extime] = result
logger.info("===> Line: " + self.publicIP.__str__() + " signed " + rl.publicIP.__str__())
return jsonpickle.encode(result)
else: # TODO: Rest Error handling
return "Not enough capacity" # better handling here would be nice
else:
return "Unauthorized" # better handling here would be nice
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
parser.add_argument('--cons', type=str, nargs='+', required=True,
help='List of IP addresses of adjacent connections')
args = parser.parse_args()
argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger.info(argcons)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
lin = Line(args.cap, argcons, scheduler)
fast_app.include_router(lin.fastRouter)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)