diff --git a/connection.py b/connection.py index bbe38e398c63174d9cbc2cc6b56a28e1a823a00d..40c129bcef051e2dde9a872babfbd7fb3a9667a0 100644 --- a/connection.py +++ b/connection.py @@ -1,7 +1,5 @@ import datetime import ipaddress -import argparse -import asyncio import httpx from dilithium import Dilithium @@ -19,22 +17,7 @@ import logging from remoteconnection import RemoteConnection -import socket - - -def get_pub_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - try: - # doesn't even have to be reachable - s.connect(('10.254.254.254', 1)) - ip = s.getsockname()[0] - except Exception: - ip = '127.0.0.1' - finally: - s.close() - return ip - +from util import get_pub_ip logger = logging.getLogger(__name__) @@ -42,80 +25,15 @@ logger = logging.getLogger(__name__) # TODO: is Connection (cable or transformer) involved in confirming trades? can this be be prevented? class Connection: - def __init__(self, cap: float, cons: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): + def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) - self.__secretKey, self.__publicKey = self.dil.keygen( + self.__secretKey, self.publicKey = self.dil.keygen( [randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)]) self.fastRouter = APIRouter() self.scheduler = sched - self.remoteIPs: set[ipaddress.IPv4Address] = cons - self.adjacentConnections: set[RemoteConnection] = set() + self.remoteIPs: set[ipaddress.IPv4Address] = conips self.publicIP = get_pub_ip() self.availableCapacity = cap self.usedCapacity = 0 - self.lineloss = 0.01 # this is a wrong assumption, actual line loss is depending on the load - # adding fastapi endpoints - self.fastRouter.add_api_route("/getConnections", self.getConnections, methods=["GET"]) - self.fastRouter.add_api_route("/getCapacity", self.getCapacity, methods=["GET"]) - self.fastRouter.add_api_route("/asRemoteConnectionJSON", self.asRemoteConnectionJSON, methods=["GET"]) - # setting up scheduling - run_date = datetime.datetime.now() + datetime.timedelta(hours=1, - seconds=5) # +1 hour because timezones suck hard - self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', ) - - async def getConnections(self): - return jsonpickle.encode(self.adjacentConnections) - - async def getCapacity(self): - return self - - async def retrieveConnections(self): - result: set[RemoteConnection] = set() - for ip in self.remoteIPs: - async with httpx.AsyncClient() as client: - response = await client.get("http://" + ip.__str__() + ":8000/asRemoteConnectionJSON") - result.add(jsonpickle.decode(response.json())) - - self.adjacentConnections.update(result) - - async def asRemoteConnectionJSON(self): - rc: RemoteConnection = RemoteConnection(self.publicIP, self.__publicKey, self.availableCapacity, - self.usedCapacity, self.adjacentConnections) - return jsonpickle.encode(rc) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Connection service') - parser.add_argument('--cap', type=float, required=True, help='Available capacity') - parser.add_argument('--cons', type=str, nargs='+', required=True, - help='List of IP addresses of adjacent connections') - args = parser.parse_args() - - argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons) - - jobstores = { - 'default': MemoryJobStore() - } - scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin') - logging.basicConfig( - level=logging.INFO, - handlers=[ - logging.StreamHandler() - ] - ) - - logger.info(argcons) - - - @asynccontextmanager - async def lifespan(app: FastAPI): - scheduler.start() - yield - scheduler.shutdown() - - - fast_app = FastAPI(lifespan=lifespan) - con = Connection(args.cap, argcons, scheduler) - fast_app.include_router(con.fastRouter) - uvicorn.run(fast_app, host=get_pub_ip(), port=8000) + self.loss = 0.01 # this is a wrong assumption, actual line loss is depending on the load diff --git a/docker-compose.yaml b/docker-compose.yaml index 831e61abb1471b5784f074845ac93567875f0508..1417fce06683c14b70e50b815ce596f3111fc13a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,7 +19,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/dht/dockerfile - connection100: + transformer100: networks: network1: ipv4_address: 172.20.0.100 @@ -27,9 +27,9 @@ services: - 8100:8000 build: context: . - dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile - command: ["python", "./connection.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"] - connection101: + dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile + command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"] + line101: networks: network1: ipv4_address: 172.20.0.101 @@ -37,9 +37,9 @@ services: - 8101:8000 build: context: . - dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile - command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100"] - connection102: + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100"] + line102: networks: network1: ipv4_address: 172.20.0.102 @@ -47,9 +47,9 @@ services: - 8102:8000 build: context: . - dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile - command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100" ] - connection103: + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ] + line103: networks: network1: ipv4_address: 172.20.0.103 @@ -57,8 +57,8 @@ services: - 8103:8000 build: context: . - dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile - command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100" ] + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ] part11: networks: network1: diff --git a/line.py b/line.py new file mode 100644 index 0000000000000000000000000000000000000000..aaf9bddfa9ec3463aefad7554466692d2b18795d --- /dev/null +++ b/line.py @@ -0,0 +1,84 @@ +import datetime +import ipaddress + +import httpx +from apscheduler.schedulers.asyncio import AsyncIOScheduler +import argparse +import asyncio +from contextlib import asynccontextmanager +from fastapi import APIRouter, FastAPI +import jsonpickle +import uvicorn +from apscheduler.jobstores.memory import MemoryJobStore +from util import get_pub_ip +import logging + +from connection import Connection +from remotetransformer import RemoteTransformer +from remoteparticipant import RemoteParticipant +from remoteline import RemoteLine + +logger = logging.getLogger(__name__) + + +class Line(Connection): + def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): + super().__init__(cap, conips, sched) + self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set() + # adding fastapi endpoints + self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"]) + # setting up scheduling + run_date = datetime.datetime.now() + datetime.timedelta(hours=1, + seconds=5) # +1 hour because timezones suck hard + self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', ) + + async def retrieveConnections(self): + result: set[RemoteTransformer | RemoteParticipant] = set() + for ip in self.remoteIPs: + async with httpx.AsyncClient() as client: + response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON") + result.add(jsonpickle.decode(response.json())) + + self.adjacentConnections.update(result) + # logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__()) + + async def asRemoteLineJSON(self): + rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity, + self.usedCapacity, self.adjacentConnections, self.loss) + return jsonpickle.encode(rl) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Connection service') + parser.add_argument('--cap', type=float, required=True, help='Available capacity') + parser.add_argument('--cons', type=str, nargs='+', required=True, + help='List of IP addresses of adjacent connections') + args = parser.parse_args() + + argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons) + + jobstores = { + 'default': MemoryJobStore() + } + scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin') + logging.basicConfig( + level=logging.INFO, + handlers=[ + logging.StreamHandler() + ] + ) + + logger.info(argcons) + + + @asynccontextmanager + async def lifespan(app: FastAPI): + scheduler.start() + yield + scheduler.shutdown() + + + fast_app = FastAPI(lifespan=lifespan) + lin = Line(args.cap, argcons, scheduler) + fast_app.include_router(lin.fastRouter) + uvicorn.run(fast_app, host=get_pub_ip(), port=8000) diff --git a/connection/dockerfile b/line/dockerfile similarity index 68% rename from connection/dockerfile rename to line/dockerfile index 71ab8f0d9d005ce0e2dad0f5507015178a03b365..a329229925eb857aa91a279e93ea533af8028440 100644 --- a/connection/dockerfile +++ b/line/dockerfile @@ -1,6 +1,6 @@ FROM python:3.12 # Or any preferred Python version. -ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py . +ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py line.py remotetransformer.py remoteline.py util.py . RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx -CMD ["python", "./connection.py"] +CMD ["python", "./line.py"] # Or enter the name of your unique directory and parameter set. \ No newline at end of file diff --git a/main.py b/main.py index fd0fe8c1cee4dfd0c526e3f7d7edfd7ac700f8e8..6e3809c768ca6cb566409e7e331eab10db3801ad 100644 --- a/main.py +++ b/main.py @@ -89,6 +89,7 @@ async def lifespan(app: FastAPI): fast_app = FastAPI(lifespan=lifespan) # start the server +''' if __name__ == "__main__": @scheduler.scheduled_job('interval', seconds=100) def scheduled_job_1(): @@ -100,3 +101,4 @@ if __name__ == "__main__": #scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants') fast_app.include_router(part.fastRouter) uvicorn.run(fast_app, host=get_ip(), port=8000) +''' diff --git a/participant.py b/participant.py index 5008f92da68ff25ed36efa69df2b718dbbc1969c..c3e233059fa61b6e12ab6ac725971b7c05d043d9 100644 --- a/participant.py +++ b/participant.py @@ -20,12 +20,15 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from dilithium import Dilithium import dilithium.dilithium import numpy +from util import get_pub_ip from offer import Offer from request import Request from exchange import Exchange from trade import Trade from remoteconnection import RemoteConnection +from remoteline import RemoteLine +from remotetransformer import RemoteTransformer from remoteparticipant import RemoteParticipant @@ -34,22 +37,6 @@ import logging logger = logging.getLogger(__name__) -import socket - - -def get_pub_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - try: - # doesn't even have to be reachable - s.connect(('10.254.254.254', 1)) - ip = s.getsockname()[0] - except Exception: - ip = '127.0.0.1' - finally: - s.close() - return ip - class Participant: def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address, @@ -64,8 +51,9 @@ class Participant: self.__dht_network_id = nid self.publicIP: ipaddress.IPv4Address = ipaddress.IPv4Address( ip) # set to current ipv4, confirm before each new round - self.connection: RemoteConnection = None - self.grid = rx.PyGraph() + self.line: RemoteLine = None + self.grid: rx.PyGraph = rx.PyGraph() + self.gridindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph self.knownIPs: set[ipaddress.IPv4Address] = set() self.remoteParticipants: set[RemoteParticipant] = set() self.availableExchanges: set[ @@ -83,13 +71,14 @@ class Participant: self.fastRouter.add_api_route("/get_remoteparticipants_asJSON", self.get_remoteparticipants_asjson, methods=["GET"]) self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"]) + self.fastRouter.add_api_route("/get_ownLine", self.get_ownline, methods=["GET"]) # announce self on dht asyncio.run(self.dht_startup()) # self.loop.run_in_executor(None, self.dht_startup) # get potential peers from dht asyncio.run(self.dht_update_peers()) # retrieve information from my grid connection - asyncio.run(self.buildOwnConnection(con_ip)) + asyncio.run(self.buildOwnLine(con_ip)) # set up scheduling for tasks self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants') @@ -146,17 +135,40 @@ class Participant: self.knownIPs.clear() return else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users - logger.info("===>" + self.publicIP.__str__() + " has connection " + self.connection.publicIP.__str__()) # TODO + logger.info( + "===>" + self.publicIP.__str__() + " has connection " + self.line.publicIP.__str__()) # TODO async def get_knownIPs(self): return jsonpickle.encode(self.knownIPs) - async def buildOwnConnection(self, con_ip: ipaddress.IPv4Address): - time.sleep(1) # sleep to make sure that the connection is already up + async def buildOwnLine(self, con_ip: ipaddress.IPv4Address): + await asyncio.sleep(1) # sleep to make sure that the connection is already up async with httpx.AsyncClient() as client: - response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteConnectionJSON") + response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteJSON") + + self.line = jsonpickle.decode(response.json()) + logger.info("===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + self.line.publicIP.__str__()) - self.connection = jsonpickle.decode(response.json()) + async def get_ownline(self): + return jsonpickle.encode(self.line) + + async def discoverGrid(self): # this is not robust + pass + ''' + i: int = self.grid.add_node(self.connection) + self.gridindexes[self.connection.publicKey] = i + await self.discoverFromNode(self.connection.publicKey) + ''' + + async def discoverFromNode(self, nodepublickey: bytes): + for con in self.grid[self.gridindexes[nodepublickey]].adjacent_connections: + async with httpx.AsyncClient() as client: + response = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteConnectionJSON") + rc = jsonpickle.decode(response.json()) + if rc.publicKey not in self.gridindexes: + i: int = self.grid.add_node(rc) + self.gridindexes[rc.publicKey] = i + self.grid.add_edge(self.gridindexes[nodepublickey], i, con.loneloss) if __name__ == "__main__": diff --git a/participant/dockerfile b/participant/dockerfile index 8964341c68ec2cf1a401439d5ebdcaa29fcb6137..3c0bd3e4d7f388558f3851c9ae17eab1339a7954 100644 --- a/participant/dockerfile +++ b/participant/dockerfile @@ -1,6 +1,6 @@ FROM python:3.12 # Or any preferred Python version. -ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py route.py . +ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py remotetransformer.py remoteline.py route.py util.py . RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx CMD ["python", "./participant.py"] # Or enter the name of your unique directory and parameter set. \ No newline at end of file diff --git a/remoteconnection.py b/remoteconnection.py index 797852675f8017320226fa0124d6edfa51e59bb5..437ee0b5c63b8794f20d33055b7196de01c85383 100644 --- a/remoteconnection.py +++ b/remoteconnection.py @@ -3,11 +3,16 @@ import time class RemoteConnection: - def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, cons: set['RemoteConnection']): + def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, l): self.publicIP: ipaddress.IPv4Address = ip self.publicKey: bytes = pk self.__availableCapacity: float = cap self.__usedCapacity: float = ucap + self.loss = l # loss rate during transmission or transformation self.timestamp = time.time_ns() - self.adjacentConnections: set[RemoteConnection] = cons + def __eq__(self, other): + return self.publicKey == other.publicKey + + def __hash__(self): + return hash(self.publicKey) diff --git a/remoteline.py b/remoteline.py new file mode 100644 index 0000000000000000000000000000000000000000..880257fd539d88aa6811b65704010af745a58da9 --- /dev/null +++ b/remoteline.py @@ -0,0 +1,13 @@ +import ipaddress +import time + +import remoteconnection +import remotetransformer +import remoteparticipant + + +class RemoteLine(remoteconnection.RemoteConnection): + def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, + cons, ll): + super().__init__(ip, pk, cap, ucap, ll) + self.adjacentConnections = cons diff --git a/remoteparticipant.py b/remoteparticipant.py index 8b5a5dcbc2f3f3d2d0d8c7346d4a0a2ffd4dcc7a..e3b47f593177291e026bd0f0a4fb8712a2fb2bd1 100644 --- a/remoteparticipant.py +++ b/remoteparticipant.py @@ -36,3 +36,9 @@ class RemoteParticipant: return True else: return False + + def __eq__(self, other): + return self.publicKey == other.publicKey + + def __hash__(self): + return hash(self.publicKey) \ No newline at end of file diff --git a/remotetransformer.py b/remotetransformer.py new file mode 100644 index 0000000000000000000000000000000000000000..125e18d0bda24ce47effd409b2dd67a3444bacaf --- /dev/null +++ b/remotetransformer.py @@ -0,0 +1,12 @@ +import ipaddress +import time + +import remoteconnection +import remoteline + + +class RemoteTransformer(remoteconnection.RemoteConnection): + def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, + cons: set['remoteline.RemoteLine'], ll): + super().__init__(ip, pk, cap, ucap, ll) + self.adjacentConnections: set[remoteline.RemoteLine] = cons diff --git a/transformer.py b/transformer.py new file mode 100644 index 0000000000000000000000000000000000000000..3dd9ed68efd82be04f0641ae209f1bbdd5cc76df --- /dev/null +++ b/transformer.py @@ -0,0 +1,83 @@ +import datetime +import ipaddress + +import httpx +from apscheduler.schedulers.asyncio import AsyncIOScheduler +import argparse +import asyncio +from contextlib import asynccontextmanager +from fastapi import APIRouter, FastAPI +import jsonpickle +import uvicorn +from apscheduler.jobstores.memory import MemoryJobStore +from util import get_pub_ip +import logging + +from connection import Connection +from remotetransformer import RemoteTransformer +from remoteline import RemoteLine + +logger = logging.getLogger(__name__) + + +class Transformer(Connection): + def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): + super().__init__(cap, conips, sched) + self.adjacentLines: set[RemoteLine] = set() + # adding fastapi endpoints + self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteTransformerJSON, methods=["GET"]) + # setting up scheduling + run_date = datetime.datetime.now() + datetime.timedelta(hours=1, + seconds=5) # +1 hour because timezones suck hard + self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', ) + + async def asRemoteTransformerJSON(self): + rt: RemoteTransformer = RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity, + self.usedCapacity, self.adjacentLines, self.loss) + return jsonpickle.encode(rt) + + async def retrieveConnections(self): + result: set[RemoteLine] = set() + for ip in self.remoteIPs: + async with httpx.AsyncClient() as client: + response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON") + result.add(jsonpickle.decode(response.json())) + + self.adjacentLines.update(result) + logger.info("===> Transformer: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Connection service') + parser.add_argument('--cap', type=float, required=True, help='Available capacity') + parser.add_argument('--cons', type=str, nargs='+', required=True, + help='List of IP addresses of adjacent connections') + args = parser.parse_args() + + argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons) + + jobstores = { + 'default': MemoryJobStore() + } + scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin') + logging.basicConfig( + level=logging.INFO, + handlers=[ + logging.StreamHandler() + ] + ) + + logger.info(argcons) + + + @asynccontextmanager + async def lifespan(app: FastAPI): + scheduler.start() + yield + scheduler.shutdown() + + + fast_app = FastAPI(lifespan=lifespan) + tra = Transformer(args.cap, argcons, scheduler) + fast_app.include_router(tra.fastRouter) + uvicorn.run(fast_app, host=get_pub_ip(), port=8000) diff --git a/transformer/dockerfile b/transformer/dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1603e69b8bd25f05f89e8081b2cc2b7b5a905361 --- /dev/null +++ b/transformer/dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12 +# Or any preferred Python version. +ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py transformer.py remoteline.py remotetransformer.py util.py . +RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx +CMD ["python", "./transformer.py"] +# Or enter the name of your unique directory and parameter set. \ No newline at end of file diff --git a/util.py b/util.py new file mode 100644 index 0000000000000000000000000000000000000000..5df2296ca207e54c4a7ebed39e28c60b1a1a714f --- /dev/null +++ b/util.py @@ -0,0 +1,15 @@ +import socket + + +def get_pub_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + try: + # doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + ip = s.getsockname()[0] + except Exception: + ip = '127.0.0.1' + finally: + s.close() + return ip \ No newline at end of file