Newer
Older
import datetime
import ipaddress
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from util import get_pub_ip
import logging
from connection import Connection
from remotetransformer import RemoteTransformer
from remoteparticipant import RemoteParticipant
from remoteline import RemoteLine, SignedRemoteLine
logger = logging.getLogger(__name__)
class Line(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
self.lastSignedLine: dict[datetime.datetime, SignedRemoteLine] = dict()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
async def retrieveConnections(self):
result: set[RemoteTransformer | RemoteParticipant] = set()
for ip in self.remoteIPs:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
result.add(jsonpickle.decode(response.json()))
self.adjacentConnections.update(result)
# logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
async def asRemoteLineJSON(self):
rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss)
return jsonpickle.encode(rl)
async def sign(self, time: datetime.datetime, rljson):
rl = jsonpickle.decode(rljson)
if rl.publicKey == self.publicKey: # check if the rl actually is me
time = time.replace(second=0, microsecond=0) # we are scheduling in minute intervals
if time not in self.lastSignedLine.keys():
origin = SignedRemoteLine(
RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss), None)
origin.isOrigin = True
origin.signature = self.dil.sign_with_input(self.__secretKey, origin.__str__())
result = SignedRemoteLine(rl, origin)
result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
self.lastSignedLine[time] = result
return jsonpickle.encode(result)
else:
# if there has been a route announced before, add the previous to the new one
result = SignedRemoteLine(rl, self.lastSignedLine[time])
result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
self.lastSignedLine[time] = result
return jsonpickle.encode(result)
else:
return "Unauthorized" # better handling here would be nice
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
parser.add_argument('--cons', type=str, nargs='+', required=True,
help='List of IP addresses of adjacent connections')
args = parser.parse_args()
argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger.info(argcons)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
lin = Line(args.cap, argcons, scheduler)
fast_app.include_router(lin.fastRouter)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)