diff --git a/docker-compose.yaml b/docker-compose.yaml index 1417fce06683c14b70e50b815ce596f3111fc13a..d8514483450743c537fcddc1594562e1253c9534 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -79,6 +79,8 @@ services: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile command: [ "python", "./participant.py", "--con", "172.20.0.102"] + profiles: + - donotstart part13: networks: network1: @@ -89,3 +91,5 @@ services: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile command: [ "python", "./participant.py", "--con", "172.20.0.103"] + profiles: + - donotstart diff --git a/participant.py b/participant.py index c3e233059fa61b6e12ab6ac725971b7c05d043d9..de941bf8da151e2defda2ff01149b3fb65a616a6 100644 --- a/participant.py +++ b/participant.py @@ -4,6 +4,7 @@ import time from contextlib import asynccontextmanager from datetime import datetime, timedelta +from collections import defaultdict from random import randint import ipaddress import asyncio @@ -42,7 +43,7 @@ class Participant: def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address, scd: AsyncIOScheduler, ip: ipaddress = get_pub_ip()): self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) - self.__secretKey, self.__publicKey = self.dil.keygen( + self.__secretKey, self.publicKey = self.dil.keygen( [randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)]) self.fastRouter = APIRouter() @@ -51,9 +52,10 @@ class Participant: self.__dht_network_id = nid self.publicIP: ipaddress.IPv4Address = ipaddress.IPv4Address( ip) # set to current ipv4, confirm before each new round - self.line: RemoteLine = None + self.adjacentLines: set[RemoteLine] = set() self.grid: rx.PyGraph = rx.PyGraph() - self.gridindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph + self.gridnodeindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph + self.gridedgeindexes: dict[bytes:set[int]] = defaultdict(set) self.knownIPs: set[ipaddress.IPv4Address] = set() self.remoteParticipants: set[RemoteParticipant] = set() self.availableExchanges: set[ @@ -68,10 +70,9 @@ class Participant: # self.__projectedInhouseSupply = Q_(0, ureg.watt) # expected supply for next round # register rest endpoints with fastapi - self.fastRouter.add_api_route("/get_remoteparticipants_asJSON", self.get_remoteparticipants_asjson, + self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteParticipantJSON, methods=["GET"]) self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"]) - self.fastRouter.add_api_route("/get_ownLine", self.get_ownline, methods=["GET"]) # announce self on dht asyncio.run(self.dht_startup()) # self.loop.run_in_executor(None, self.dht_startup) @@ -86,7 +87,7 @@ class Participant: logger.info("Finished setup on " + self.publicIP.__str__()) def as_remote_participant(self) -> RemoteParticipant: - return RemoteParticipant(pk=self.__publicKey, ip=self.publicIP, nex=self.availableExchanges) + return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges) async def dht_startup(self) -> None: logger.info("dht_startup on " + self.publicIP.__str__()) @@ -103,7 +104,7 @@ class Participant: self.knownIPs.update(dht_ips) # fastapi call - async def get_remoteparticipants_asjson(self): + async def asRemoteParticipantJSON(self): all_remoteparticipants: set[RemoteParticipant] = set() all_remoteparticipants.add(self.as_remote_participant()) all_remoteparticipants.update(self.remoteParticipants) @@ -121,7 +122,7 @@ class Participant: await self.dht_update_peers() for ip in self.knownIPs: try: - url = 'http://' + ip.__str__() + ':8000/get_remoteparticipants_asJSON' + url = 'http://' + ip.__str__() + ':8000/asRemoteJSON' async with httpx.AsyncClient() as client: response = await client.get(url) remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json()) @@ -134,9 +135,8 @@ class Participant: self.remoteParticipants.update(all_remoteparticipants) self.knownIPs.clear() return - else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users - logger.info( - "===>" + self.publicIP.__str__() + " has connection " + self.line.publicIP.__str__()) # TODO + else: # TODO: continued peer updates without dht, periodic dht discovery should still happen to discover new users + pass async def get_knownIPs(self): return jsonpickle.encode(self.knownIPs) @@ -146,29 +146,25 @@ class Participant: async with httpx.AsyncClient() as client: response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteJSON") - self.line = jsonpickle.decode(response.json()) - logger.info("===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + self.line.publicIP.__str__()) - - async def get_ownline(self): - return jsonpickle.encode(self.line) + line = jsonpickle.decode(response.json()) + self.adjacentLines.add(line) + logger.info( + "===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + line.publicIP.__str__()) async def discoverGrid(self): # this is not robust - pass - ''' - i: int = self.grid.add_node(self.connection) - self.gridindexes[self.connection.publicKey] = i - await self.discoverFromNode(self.connection.publicKey) - ''' + i: int = self.grid.add_node(self.as_remote_participant()) + self.gridnodeindexes[self.publicKey] = i + await self.discoverFromNode(self.publicKey) async def discoverFromNode(self, nodepublickey: bytes): - for con in self.grid[self.gridindexes[nodepublickey]].adjacent_connections: + for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for all lines connected to given part/trafo async with httpx.AsyncClient() as client: - response = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteConnectionJSON") - rc = jsonpickle.decode(response.json()) - if rc.publicKey not in self.gridindexes: - i: int = self.grid.add_node(rc) - self.gridindexes[rc.publicKey] = i - self.grid.add_edge(self.gridindexes[nodepublickey], i, con.loneloss) + response = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON") + rlin: RemoteLine = jsonpickle.decode(response.json()) + for con in rlin.adjacentConnections: + pass + + if __name__ == "__main__":