Skip to content
Snippets Groups Projects
line.py 4.55 KiB
Newer Older
  • Learn to ignore specific revisions
  • import datetime
    import ipaddress
    
    import httpx
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    import argparse
    import asyncio
    from contextlib import asynccontextmanager
    from fastapi import APIRouter, FastAPI
    import jsonpickle
    import uvicorn
    from apscheduler.jobstores.memory import MemoryJobStore
    from util import get_pub_ip
    import logging
    
    from connection import Connection
    from remotetransformer import RemoteTransformer
    from remoteparticipant import RemoteParticipant
    
    from remoteline import RemoteLine, SignedRemoteLine
    
    
    logger = logging.getLogger(__name__)
    
    
    class Line(Connection):
        def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
            super().__init__(cap, conips, sched)
            self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
    
            self.lastSignedLine: dict[datetime.datetime, SignedRemoteLine] = dict()
    
            # adding fastapi endpoints
            self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
    
            self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"])
    
            # setting up scheduling
            run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
                                                                    seconds=5)  # +1 hour because timezones suck hard
            self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
    
        async def retrieveConnections(self):
            result: set[RemoteTransformer | RemoteParticipant] = set()
            for ip in self.remoteIPs:
                async with httpx.AsyncClient() as client:
                    response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
                    result.add(jsonpickle.decode(response.json()))
    
            self.adjacentConnections.update(result)
            # logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
    
        async def asRemoteLineJSON(self):
            rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
                                        self.usedCapacity, self.adjacentConnections, self.loss)
            return jsonpickle.encode(rl)
    
    
        async def sign(self, time: datetime.datetime, rljson):
            rl = jsonpickle.decode(rljson)
            if rl.publicKey == self.publicKey:  # check if the rl actually is me
                time = time.replace(second=0, microsecond=0)  # we are scheduling in minute intervals
                if time not in self.lastSignedLine.keys():
                    origin = SignedRemoteLine(
                        RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
                                   self.usedCapacity, self.adjacentConnections, self.loss), None)
                    origin.isOrigin = True
                    origin.signature = self.dil.sign_with_input(self.__secretKey, origin.__str__())
                    result = SignedRemoteLine(rl, origin)
                    result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
                    self.lastSignedLine[time] = result
                    return jsonpickle.encode(result)
                else:
                    # if there has been a route announced before, add the previous to the new one
                    result = SignedRemoteLine(rl, self.lastSignedLine[time])
                    result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
                    self.lastSignedLine[time] = result
                    return jsonpickle.encode(result)
            else:
                return "Unauthorized"  # better handling here would be nice
    
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser(description='Connection service')
        parser.add_argument('--cap', type=float, required=True, help='Available capacity')
        parser.add_argument('--cons', type=str, nargs='+', required=True,
                            help='List of IP addresses of adjacent connections')
        args = parser.parse_args()
    
        argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
    
        jobstores = {
            'default': MemoryJobStore()
        }
        scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
        logging.basicConfig(
            level=logging.INFO,
            handlers=[
                logging.StreamHandler()
            ]
        )
    
        logger.info(argcons)
    
    
        @asynccontextmanager
        async def lifespan(app: FastAPI):
            scheduler.start()
            yield
            scheduler.shutdown()
    
    
        fast_app = FastAPI(lifespan=lifespan)
        lin = Line(args.cap, argcons, scheduler)
        fast_app.include_router(lin.fastRouter)
        uvicorn.run(fast_app, host=get_pub_ip(), port=8000)