Skip to content
Snippets Groups Projects
Commit f3f6780e authored by Johannes Hitzinger's avatar Johannes Hitzinger
Browse files

grid discovery completely done, all participants can find each other

parent 5f4efca0
Branches
No related tags found
No related merge requests found
......@@ -19,16 +19,36 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/dht/dockerfile
transformer100:
transformer200:
networks:
network1:
ipv4_address: 172.20.0.100
ipv4_address: 172.20.0.200
ports:
- 8100:8000
- 8200:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile
command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"]
command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103" ]
transformer201:
networks:
network1:
ipv4_address: 172.20.0.201
ports:
- 8201:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile
command: [ "python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.104", "172.20.0.103" ]
transformer202:
networks:
network1:
ipv4_address: 172.20.0.202
ports:
- 8202:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile
command: [ "python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.104", "172.20.0.102"]
line101:
networks:
network1:
......@@ -38,7 +58,7 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100"]
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200"]
line102:
networks:
network1:
......@@ -48,7 +68,7 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200", "172.20.0.202" ]
line103:
networks:
network1:
......@@ -58,7 +78,37 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.200" , "172.20.0.201"]
line104:
networks:
network1:
ipv4_address: 172.20.0.104
ports:
- 8104:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.202" , "172.20.0.201" ]
line105:
networks:
network1:
ipv4_address: 172.20.0.105
ports:
- 8105:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.201"]
line106:
networks:
network1:
ipv4_address: 172.20.0.106
ports:
- 8106:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.202" ]
part11:
networks:
network1:
......@@ -78,9 +128,7 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile
command: [ "python", "./participant.py", "--con", "172.20.0.102"]
profiles:
- donotstart
command: [ "python", "./participant.py", "--con", "172.20.0.105"]
part13:
networks:
network1:
......@@ -90,6 +138,4 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile
command: [ "python", "./participant.py", "--con", "172.20.0.103"]
profiles:
- donotstart
command: [ "python", "./participant.py", "--con", "172.20.0.106"]
\ No newline at end of file
#from main import ureg,Q_
# from main import ureg,Q_
import locale
import time
from datetime import datetime,timedelta
from datetime import datetime, timedelta
class Trade:
pass
class Exchange:
def __init__(self,pk: bytes, p: float, ppw: float):
def __init__(self, pk: bytes, p: float, ppw: float):
self.pubicKey: bytes = pk
self.timestamp: int = time.time_ns()
self.__isActive: bool = False
#self.__power = Q_(p, ureg.watt)
# self.__power = Q_(p, ureg.watt)
self.__pricePerWatt = locale.currency(ppw)
self.__trades: set[Trade] = set()
self.__sig: bytes
def activate(self) -> None:
self.__isActive = True
......@@ -53,4 +55,8 @@ class Exchange:
if start_of_minute_unix <= self.timestamp <= end_of_minute_unix:
return True
else:
return False
\ No newline at end of file
return False
def is_old(self) -> bool:
now = datetime.now()
return self.timestamp < int(now.timestamp())
......@@ -12,6 +12,11 @@ import httpx
import argparse
import rustworkx as rx
from rustworkx.visualization import mpl_draw
import base64
from io import BytesIO
from fastapi.responses import HTMLResponse
import matplotlib.pyplot as plt
import jsonpickle
import uvicorn
......@@ -56,7 +61,7 @@ class Participant:
self.grid: rx.PyGraph = rx.PyGraph()
self.gridnodeindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph
self.gridedgeindexes: dict[bytes:set[int]] = defaultdict(set)
self.knownIPs: set[ipaddress.IPv4Address] = set()
self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants
self.remoteParticipants: set[RemoteParticipant] = set()
self.availableExchanges: set[
Exchange] = set() # known available exchanges from other participants for next turns
......@@ -70,9 +75,11 @@ class Participant:
# self.__projectedInhouseSupply = Q_(0, ureg.watt) # expected supply for next round
# register rest endpoints with fastapi
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteParticipantJSON,
self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON,
methods=["GET"])
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteJSON, methods=["GET"])
self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"])
self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"])
# announce self on dht
asyncio.run(self.dht_startup())
# self.loop.run_in_executor(None, self.dht_startup)
......@@ -80,14 +87,23 @@ class Participant:
asyncio.run(self.dht_update_peers())
# retrieve information from my grid connection
asyncio.run(self.buildOwnLine(con_ip))
# discover the grid
time.sleep(10)
asyncio.run(self.discoverGrid())
# set up scheduling for tasks
self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=20, id='1',
self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1',
name='requestRemoteParticipants')
logger.info("Finished setup on " + self.publicIP.__str__())
def as_remote_participant(self) -> RemoteParticipant:
return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges)
return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges,
adjlines=self.adjacentLines)
async def asRemoteJSON(self):
return jsonpickle.encode(self.as_remote_participant())
async def dht_startup(self) -> None:
logger.info("dht_startup on " + self.publicIP.__str__())
......@@ -104,7 +120,7 @@ class Participant:
self.knownIPs.update(dht_ips)
# fastapi call
async def asRemoteParticipantJSON(self):
async def allRemoteParticipantsJSON(self): # all known participants including self
all_remoteparticipants: set[RemoteParticipant] = set()
all_remoteparticipants.add(self.as_remote_participant())
all_remoteparticipants.update(self.remoteParticipants)
......@@ -113,16 +129,50 @@ class Participant:
def update_remoteparticipants(self, new_rps: set[RemoteParticipant], target_rps: set[RemoteParticipant]):
for new_rp in new_rps:
for rp in target_rps:
if new_rp.publicKey == rp.publicKey: # found same participant
pass # TODO
if new_rp == rp: # found same participant
rp.update_nextExchanges(new_rp.nextExchanges)
rp.timestamp = new_rp.timestamp
else: # participant is new
target_rps.add(new_rp)
async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]):
logger.info("===> hello from insert into grid")
for rp in all_rps:
if rp.publicKey not in self.gridnodeindexes.keys(): # found new participant, add and connect it
logger.info("===> found unknown participant: " + rp.publicIP.__str__())
i = self.grid.add_node(rp)
self.gridnodeindexes[rp.publicKey] = i
for line in rp.adjacentLines: # should only be one line that connects the participant to the grid
async with httpx.AsyncClient() as client:
response_edge = await client.get("http://" + line.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> insert_part: getting line from: " + line.publicIP.__str__())
rline: RemoteLine = jsonpickle.decode(response_edge.json())
for trafo in rline.adjacentConnections: # participant should be connected to a transformer
if trafo.publicKey in self.gridnodeindexes.keys():
i_edge = self.grid.add_edge(i, self.gridnodeindexes[trafo.publicKey], rline)
self.gridedgeindexes[rline.publicKey].add(i_edge)
break
async def request_remoteparticipants(self) -> None:
logger.info("===> requesting remote participants on " + self.publicIP.__str__())
all_remoteparticipants: set[RemoteParticipant] = set()
if len(self.remoteParticipants) == 0: # initial discovery via dht
await self.dht_update_peers()
for ip in self.knownIPs:
try:
url = 'http://' + ip.__str__() + ':8000/asRemoteJSON'
url = 'http://' + ip.__str__() + ':8000/allRemoteParticipantsJSON'
async with httpx.AsyncClient() as client:
response = await client.get(url)
remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
# some validation of remoteparticipants should happen, not in scope of this thesis
all_remoteparticipants.update(remoteparticipants)
except httpx.HTTPError as err:
logger.error("httpx Error: " + err.__str__())
continue
else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users
for rp in self.remoteParticipants:
try:
url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON'
async with httpx.AsyncClient() as client:
response = await client.get(url)
remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
......@@ -132,11 +182,8 @@ class Participant:
logger.error("httpx Error: " + err.__str__())
continue
self.remoteParticipants.update(all_remoteparticipants)
self.knownIPs.clear()
return
else: # TODO: continued peer updates without dht, periodic dht discovery should still happen to discover new users
pass
self.update_remoteparticipants(all_remoteparticipants, self.remoteParticipants)
await self.insert_participants_into_grid(all_remoteparticipants)
async def get_knownIPs(self):
return jsonpickle.encode(self.knownIPs)
......@@ -157,14 +204,49 @@ class Participant:
await self.discoverFromNode(self.publicKey)
async def discoverFromNode(self, nodepublickey: bytes):
for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for all lines connected to given part/trafo
for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for lines connected to part/trafo
async with httpx.AsyncClient() as client:
response = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON")
rlin: RemoteLine = jsonpickle.decode(response.json())
response_edge = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> getting line from: " + lin.publicIP.__str__())
rlin: RemoteLine = jsonpickle.decode(response_edge.json())
for con in rlin.adjacentConnections:
pass
if con.publicKey not in self.gridnodeindexes.keys():
response_node = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> getting con from: " + con.publicIP.__str__())
rcon = jsonpickle.decode(response_node.json())
i_node = self.grid.add_node(rcon)
logger.info("===> created node for " + rcon.publicIP.__str__())
self.gridnodeindexes[rcon.publicKey] = i_node
i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey], i_node, rlin)
logger.info("===> created edge")
self.gridedgeindexes[rlin.publicKey].add(i_edge)
await self.discoverFromNode(rcon.publicKey)
else:
logger.info("===> already known node: " + con.publicIP.__str__())
if rlin.publicKey not in self.gridedgeindexes.keys() and nodepublickey != con.publicKey:
i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey],
self.gridnodeindexes[con.publicKey], rlin)
self.gridedgeindexes[rlin.publicKey].add(i_edge)
logger.info("===> created edge")
async def getGridPNG(self):
plt.clf()
img = mpl_draw(self.grid, with_labels=True, edge_labels=str, labels=str)
buf = BytesIO()
img.savefig(buf, format='png')
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode('utf-8')
html_content = f"""
<html>
<body>
<img src="data:image/png;base64,{img_base64}" />
</body>
</html>
"""
return HTMLResponse(content=html_content)
async def produceNextTrade(self):
pass
if __name__ == "__main__":
......
FROM python:3.12
# Or any preferred Python version.
ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py remotetransformer.py remoteline.py route.py util.py .
RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx
RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx matplotlib
CMD ["python", "./participant.py"]
# Or enter the name of your unique directory and parameter set.
\ No newline at end of file
......@@ -16,3 +16,6 @@ class RemoteConnection:
def __hash__(self):
return hash(self.publicKey)
def __str__(self):
return self.publicIP.__str__()
......@@ -2,8 +2,6 @@ import ipaddress
import time
import remoteconnection
import remotetransformer
import remoteparticipant
class RemoteLine(remoteconnection.RemoteConnection):
......
......@@ -5,21 +5,27 @@ from datetime import datetime, timedelta
from pydantic.dataclasses import dataclass
import exchange
import remoteline
class RemoteParticipant:
def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, nex: set[exchange.Exchange]):
def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, nex: set[exchange.Exchange],
adjlines: set[remoteline.RemoteLine]):
self.publicKey: bytes = pk
self.publicIP: ipaddress.IPv4Address = ip
self.nextExchanges: set[exchange.Exchange] = nex
self.adjacentLines: set[remoteline.RemoteLine] = adjlines
self.timestamp = time.time_ns()
def update_nextExchanges(self, exchanges: set[exchange.Exchange]):
for new_ex in exchanges:
for ex in self.nextExchanges:
# both exchanges origin from same participant and are within the next minute
if new_ex.pubicKey == ex.pubicKey and new_ex.is_min_next() and ex.is_min_next():
pass
for ex in self.nextExchanges: # clear all old exchanges
if ex.is_old():
self.nextExchanges.remove(ex)
self.nextExchanges = self.nextExchanges.union(exchanges)
def is_min_now(self) -> bool:
now = datetime.now()
......@@ -41,4 +47,7 @@ class RemoteParticipant:
return self.publicKey == other.publicKey
def __hash__(self):
return hash(self.publicKey)
\ No newline at end of file
return hash(self.publicKey)
def __str__(self):
return self.publicIP.__str__()
......@@ -9,4 +9,4 @@ class RemoteTransformer(remoteconnection.RemoteConnection):
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float,
cons: set['remoteline.RemoteLine'], ll):
super().__init__(ip, pk, cap, ucap, ll)
self.adjacentConnections: set[remoteline.RemoteLine] = cons
self.adjacentLines: set[remoteline.RemoteLine] = cons
......@@ -12,6 +12,7 @@ let
python-pkgs.httpx
python-pkgs.apscheduler
python-pkgs.rustworkx
python-pkgs.matplotlib
(python-pkgs.callPackage ./dilithium.nix { })
]);
in
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment