diff --git a/docker-compose.yaml b/docker-compose.yaml index d8514483450743c537fcddc1594562e1253c9534..f5148257e3d8dcda653d841ddb540247eb135d7b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,16 +19,36 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/dht/dockerfile - transformer100: + transformer200: networks: network1: - ipv4_address: 172.20.0.100 + ipv4_address: 172.20.0.200 ports: - - 8100:8000 + - 8200:8000 build: context: . dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile - command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"] + command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103" ] + transformer201: + networks: + network1: + ipv4_address: 172.20.0.201 + ports: + - 8201:8000 + build: + context: . + dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile + command: [ "python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.104", "172.20.0.103" ] + transformer202: + networks: + network1: + ipv4_address: 172.20.0.202 + ports: + - 8202:8000 + build: + context: . + dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile + command: [ "python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.104", "172.20.0.102"] line101: networks: network1: @@ -38,7 +58,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/line/dockerfile - command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100"] + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200"] line102: networks: network1: @@ -48,7 +68,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/line/dockerfile - command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ] + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200", "172.20.0.202" ] line103: networks: network1: @@ -58,7 +78,37 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/line/dockerfile - command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ] + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200" , "172.20.0.201"] + line104: + networks: + network1: + ipv4_address: 172.20.0.104 + ports: + - 8104:8000 + build: + context: . + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.202" , "172.20.0.201" ] + line105: + networks: + network1: + ipv4_address: 172.20.0.105 + ports: + - 8105:8000 + build: + context: . + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.201"] + line106: + networks: + network1: + ipv4_address: 172.20.0.106 + ports: + - 8106:8000 + build: + context: . + dockerfile: /home/jonny0815/git/dismagr/line/dockerfile + command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.202" ] part11: networks: network1: @@ -78,9 +128,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.102"] - profiles: - - donotstart + command: [ "python", "./participant.py", "--con", "172.20.0.105"] part13: networks: network1: @@ -90,6 +138,4 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.103"] - profiles: - - donotstart + command: [ "python", "./participant.py", "--con", "172.20.0.106"] \ No newline at end of file diff --git a/exchange.py b/exchange.py index c205277b019da927533318eac5f891662a311ccd..1ee3cb98636fc14803c6e604be4327a67f83281c 100644 --- a/exchange.py +++ b/exchange.py @@ -1,21 +1,23 @@ -#from main import ureg,Q_ +# from main import ureg,Q_ import locale import time -from datetime import datetime,timedelta +from datetime import datetime, timedelta + class Trade: pass + + class Exchange: - def __init__(self,pk: bytes, p: float, ppw: float): + def __init__(self, pk: bytes, p: float, ppw: float): self.pubicKey: bytes = pk self.timestamp: int = time.time_ns() self.__isActive: bool = False - #self.__power = Q_(p, ureg.watt) + # self.__power = Q_(p, ureg.watt) self.__pricePerWatt = locale.currency(ppw) self.__trades: set[Trade] = set() self.__sig: bytes - def activate(self) -> None: self.__isActive = True @@ -53,4 +55,8 @@ class Exchange: if start_of_minute_unix <= self.timestamp <= end_of_minute_unix: return True else: - return False \ No newline at end of file + return False + + def is_old(self) -> bool: + now = datetime.now() + return self.timestamp < int(now.timestamp()) diff --git a/participant.py b/participant.py index de941bf8da151e2defda2ff01149b3fb65a616a6..a34c944385478bfe040e06ec6d9629bd57f6d968 100644 --- a/participant.py +++ b/participant.py @@ -12,6 +12,11 @@ import httpx import argparse import rustworkx as rx +from rustworkx.visualization import mpl_draw +import base64 +from io import BytesIO +from fastapi.responses import HTMLResponse +import matplotlib.pyplot as plt import jsonpickle import uvicorn @@ -56,7 +61,7 @@ class Participant: self.grid: rx.PyGraph = rx.PyGraph() self.gridnodeindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph self.gridedgeindexes: dict[bytes:set[int]] = defaultdict(set) - self.knownIPs: set[ipaddress.IPv4Address] = set() + self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants self.remoteParticipants: set[RemoteParticipant] = set() self.availableExchanges: set[ Exchange] = set() # known available exchanges from other participants for next turns @@ -70,9 +75,11 @@ class Participant: # self.__projectedInhouseSupply = Q_(0, ureg.watt) # expected supply for next round # register rest endpoints with fastapi - self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteParticipantJSON, + self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON, methods=["GET"]) + self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteJSON, methods=["GET"]) self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"]) + self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"]) # announce self on dht asyncio.run(self.dht_startup()) # self.loop.run_in_executor(None, self.dht_startup) @@ -80,14 +87,23 @@ class Participant: asyncio.run(self.dht_update_peers()) # retrieve information from my grid connection asyncio.run(self.buildOwnLine(con_ip)) + # discover the grid + + time.sleep(10) + + asyncio.run(self.discoverGrid()) # set up scheduling for tasks - self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=20, id='1', + self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1', name='requestRemoteParticipants') logger.info("Finished setup on " + self.publicIP.__str__()) def as_remote_participant(self) -> RemoteParticipant: - return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges) + return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges, + adjlines=self.adjacentLines) + + async def asRemoteJSON(self): + return jsonpickle.encode(self.as_remote_participant()) async def dht_startup(self) -> None: logger.info("dht_startup on " + self.publicIP.__str__()) @@ -104,7 +120,7 @@ class Participant: self.knownIPs.update(dht_ips) # fastapi call - async def asRemoteParticipantJSON(self): + async def allRemoteParticipantsJSON(self): # all known participants including self all_remoteparticipants: set[RemoteParticipant] = set() all_remoteparticipants.add(self.as_remote_participant()) all_remoteparticipants.update(self.remoteParticipants) @@ -113,16 +129,50 @@ class Participant: def update_remoteparticipants(self, new_rps: set[RemoteParticipant], target_rps: set[RemoteParticipant]): for new_rp in new_rps: for rp in target_rps: - if new_rp.publicKey == rp.publicKey: # found same participant - pass # TODO + if new_rp == rp: # found same participant + rp.update_nextExchanges(new_rp.nextExchanges) + rp.timestamp = new_rp.timestamp + else: # participant is new + target_rps.add(new_rp) + + async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]): + logger.info("===> hello from insert into grid") + for rp in all_rps: + if rp.publicKey not in self.gridnodeindexes.keys(): # found new participant, add and connect it + logger.info("===> found unknown participant: " + rp.publicIP.__str__()) + i = self.grid.add_node(rp) + self.gridnodeindexes[rp.publicKey] = i + for line in rp.adjacentLines: # should only be one line that connects the participant to the grid + async with httpx.AsyncClient() as client: + response_edge = await client.get("http://" + line.publicIP.__str__() + ":8000/asRemoteJSON") + logger.info("===> insert_part: getting line from: " + line.publicIP.__str__()) + rline: RemoteLine = jsonpickle.decode(response_edge.json()) + for trafo in rline.adjacentConnections: # participant should be connected to a transformer + if trafo.publicKey in self.gridnodeindexes.keys(): + i_edge = self.grid.add_edge(i, self.gridnodeindexes[trafo.publicKey], rline) + self.gridedgeindexes[rline.publicKey].add(i_edge) + break async def request_remoteparticipants(self) -> None: + logger.info("===> requesting remote participants on " + self.publicIP.__str__()) all_remoteparticipants: set[RemoteParticipant] = set() if len(self.remoteParticipants) == 0: # initial discovery via dht await self.dht_update_peers() for ip in self.knownIPs: try: - url = 'http://' + ip.__str__() + ':8000/asRemoteJSON' + url = 'http://' + ip.__str__() + ':8000/allRemoteParticipantsJSON' + async with httpx.AsyncClient() as client: + response = await client.get(url) + remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json()) + # some validation of remoteparticipants should happen, not in scope of this thesis + all_remoteparticipants.update(remoteparticipants) + except httpx.HTTPError as err: + logger.error("httpx Error: " + err.__str__()) + continue + else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users + for rp in self.remoteParticipants: + try: + url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON' async with httpx.AsyncClient() as client: response = await client.get(url) remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json()) @@ -132,11 +182,8 @@ class Participant: logger.error("httpx Error: " + err.__str__()) continue - self.remoteParticipants.update(all_remoteparticipants) - self.knownIPs.clear() - return - else: # TODO: continued peer updates without dht, periodic dht discovery should still happen to discover new users - pass + self.update_remoteparticipants(all_remoteparticipants, self.remoteParticipants) + await self.insert_participants_into_grid(all_remoteparticipants) async def get_knownIPs(self): return jsonpickle.encode(self.knownIPs) @@ -157,14 +204,49 @@ class Participant: await self.discoverFromNode(self.publicKey) async def discoverFromNode(self, nodepublickey: bytes): - for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for all lines connected to given part/trafo + for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for lines connected to part/trafo async with httpx.AsyncClient() as client: - response = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON") - rlin: RemoteLine = jsonpickle.decode(response.json()) + response_edge = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON") + logger.info("===> getting line from: " + lin.publicIP.__str__()) + rlin: RemoteLine = jsonpickle.decode(response_edge.json()) for con in rlin.adjacentConnections: - pass - - + if con.publicKey not in self.gridnodeindexes.keys(): + response_node = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteJSON") + logger.info("===> getting con from: " + con.publicIP.__str__()) + rcon = jsonpickle.decode(response_node.json()) + i_node = self.grid.add_node(rcon) + logger.info("===> created node for " + rcon.publicIP.__str__()) + self.gridnodeindexes[rcon.publicKey] = i_node + i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey], i_node, rlin) + logger.info("===> created edge") + self.gridedgeindexes[rlin.publicKey].add(i_edge) + await self.discoverFromNode(rcon.publicKey) + else: + logger.info("===> already known node: " + con.publicIP.__str__()) + if rlin.publicKey not in self.gridedgeindexes.keys() and nodepublickey != con.publicKey: + i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey], + self.gridnodeindexes[con.publicKey], rlin) + self.gridedgeindexes[rlin.publicKey].add(i_edge) + logger.info("===> created edge") + + async def getGridPNG(self): + plt.clf() + img = mpl_draw(self.grid, with_labels=True, edge_labels=str, labels=str) + buf = BytesIO() + img.savefig(buf, format='png') + buf.seek(0) + img_base64 = base64.b64encode(buf.read()).decode('utf-8') + html_content = f""" + <html> + <body> + <img src="data:image/png;base64,{img_base64}" /> + </body> + </html> + """ + return HTMLResponse(content=html_content) + + async def produceNextTrade(self): + pass if __name__ == "__main__": diff --git a/participant/dockerfile b/participant/dockerfile index 3c0bd3e4d7f388558f3851c9ae17eab1339a7954..706d513666e37e5e3e21a4f9bb9d49de8bd12a17 100644 --- a/participant/dockerfile +++ b/participant/dockerfile @@ -1,6 +1,6 @@ FROM python:3.12 # Or any preferred Python version. ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py remotetransformer.py remoteline.py route.py util.py . -RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx +RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx matplotlib CMD ["python", "./participant.py"] # Or enter the name of your unique directory and parameter set. \ No newline at end of file diff --git a/remoteconnection.py b/remoteconnection.py index 437ee0b5c63b8794f20d33055b7196de01c85383..db3400bc71ce73b71c019362685a1e5748d15173 100644 --- a/remoteconnection.py +++ b/remoteconnection.py @@ -16,3 +16,6 @@ class RemoteConnection: def __hash__(self): return hash(self.publicKey) + + def __str__(self): + return self.publicIP.__str__() diff --git a/remoteline.py b/remoteline.py index 880257fd539d88aa6811b65704010af745a58da9..1b31a3314974ffffefe0fbc869749da926b2bd0a 100644 --- a/remoteline.py +++ b/remoteline.py @@ -2,8 +2,6 @@ import ipaddress import time import remoteconnection -import remotetransformer -import remoteparticipant class RemoteLine(remoteconnection.RemoteConnection): diff --git a/remoteparticipant.py b/remoteparticipant.py index e3b47f593177291e026bd0f0a4fb8712a2fb2bd1..0437c1077f0af2af94a31f6f86de175c9097c5d0 100644 --- a/remoteparticipant.py +++ b/remoteparticipant.py @@ -5,21 +5,27 @@ from datetime import datetime, timedelta from pydantic.dataclasses import dataclass import exchange +import remoteline class RemoteParticipant: - def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, nex: set[exchange.Exchange]): + def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, nex: set[exchange.Exchange], + adjlines: set[remoteline.RemoteLine]): self.publicKey: bytes = pk self.publicIP: ipaddress.IPv4Address = ip self.nextExchanges: set[exchange.Exchange] = nex + self.adjacentLines: set[remoteline.RemoteLine] = adjlines self.timestamp = time.time_ns() def update_nextExchanges(self, exchanges: set[exchange.Exchange]): - for new_ex in exchanges: - for ex in self.nextExchanges: - # both exchanges origin from same participant and are within the next minute - if new_ex.pubicKey == ex.pubicKey and new_ex.is_min_next() and ex.is_min_next(): - pass + for ex in self.nextExchanges: # clear all old exchanges + if ex.is_old(): + self.nextExchanges.remove(ex) + + self.nextExchanges = self.nextExchanges.union(exchanges) + + + def is_min_now(self) -> bool: now = datetime.now() @@ -41,4 +47,7 @@ class RemoteParticipant: return self.publicKey == other.publicKey def __hash__(self): - return hash(self.publicKey) \ No newline at end of file + return hash(self.publicKey) + + def __str__(self): + return self.publicIP.__str__() diff --git a/remotetransformer.py b/remotetransformer.py index 125e18d0bda24ce47effd409b2dd67a3444bacaf..cb5db4202ad832eb42d0937b8992f31572f3e3b5 100644 --- a/remotetransformer.py +++ b/remotetransformer.py @@ -9,4 +9,4 @@ class RemoteTransformer(remoteconnection.RemoteConnection): def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, cons: set['remoteline.RemoteLine'], ll): super().__init__(ip, pk, cap, ucap, ll) - self.adjacentConnections: set[remoteline.RemoteLine] = cons + self.adjacentLines: set[remoteline.RemoteLine] = cons diff --git a/shell.nix b/shell.nix index 9df523afe8ceb728e7fe4ea036401be299c2d7cd..d022130c2accded70e5ccb014827fd58a6c1534e 100644 --- a/shell.nix +++ b/shell.nix @@ -12,6 +12,7 @@ let python-pkgs.httpx python-pkgs.apscheduler python-pkgs.rustworkx + python-pkgs.matplotlib (python-pkgs.callPackage ./dilithium.nix { }) ]); in