Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
line.py 5.31 KiB
import datetime
import ipaddress
import time

import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager, redirect_stdout
from fastapi import APIRouter, FastAPI, Body
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from util import get_pub_ip
import logging

from connection import Connection
from remotetransformer import RemoteTransformer
from remoteparticipant import RemoteParticipant
from remoteline import RemoteLine, SignedRemoteLine

logger = logging.getLogger(__name__)


class Line(Connection):
    def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
        super().__init__(cap, conips, sched)
        self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
        self.lastSignedLine: dict[float, SignedRemoteLine] = dict()
        # adding fastapi endpoints
        self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
        self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"])
        # setting up scheduling
        run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
                                                                seconds=5)  # +1 hour because timezones suck hard
        self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )

    async def retrieveConnections(self):
        result: set[RemoteTransformer | RemoteParticipant] = set()
        for ip in self.remoteIPs:
            async with httpx.AsyncClient() as client:
                response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
                result.add(jsonpickle.decode(response.json()))

        self.adjacentConnections.update(result)
        # logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())

    async def asRemoteLineJSON(self):
        rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
                                    self.usedCapacity, self.adjacentConnections, self.loss)
        return jsonpickle.encode(rl)

    async def sign(self, extime: float, body=Body()):
        rl = jsonpickle.decode(body["remote"])
        logger.info("===> Line: " + self.publicIP.__str__() + " received sign request for " + rl.publicIP.__str__())
        if rl.publicKey == self.publicKey:  # check if the rl actually is me
            if self.checkCapacity(rl):
                if extime not in self.lastSignedLine.keys():
                    origin = SignedRemoteLine(
                        RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
                                   self.usedCapacity, self.adjacentConnections, self.loss), None)
                    origin.isOrigin = True
                    with redirect_stdout(None):
                        origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode())
                    result = SignedRemoteLine(rl, origin)
                    with redirect_stdout(None):
                        result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())

                    # logger.info("------------------->" + rl.__repr__().encode().__str__())
                    self.lastSignedLine[extime] = result
                    logger.info("===> Line: " + self.publicIP.__str__() + " signed first " + rl.publicIP.__str__())
                    return jsonpickle.encode(result)
                else:
                    # if there has been a route announced before, add the previous to the new one
                    result = SignedRemoteLine(rl, self.lastSignedLine[extime])
                    with redirect_stdout(None):
                        result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())
                    self.lastSignedLine[extime] = result
                    logger.info("===> Line: " + self.publicIP.__str__() + " signed " + rl.publicIP.__str__())
                    return jsonpickle.encode(result)
            else:  # TODO: Rest Error handling
                return "Not enough capacity"  # better handling here would be nice
        else:
            return "Unauthorized"  # better handling here would be nice


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Connection service')
    parser.add_argument('--cap', type=float, required=True, help='Available capacity')
    parser.add_argument('--cons', type=str, nargs='+', required=True,
                        help='List of IP addresses of adjacent connections')
    args = parser.parse_args()

    argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)

    jobstores = {
        'default': MemoryJobStore()
    }
    scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
    logging.basicConfig(
        level=logging.INFO,
        handlers=[
            logging.StreamHandler()
        ]
    )

    logger.info(argcons)


    @asynccontextmanager
    async def lifespan(app: FastAPI):
        scheduler.start()
        yield
        scheduler.shutdown()


    fast_app = FastAPI(lifespan=lifespan)
    lin = Line(args.cap, argcons, scheduler)
    fast_app.include_router(lin.fastRouter)
    uvicorn.run(fast_app, host=get_pub_ip(), port=8000)