diff --git a/docker-compose.yaml b/docker-compose.yaml index 8ebdd355cbc6c37d3088768417bdd6c71fbf2d96..e073f704d6d122f3fbede383a0aea723104e307b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -118,7 +118,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.101", "--cc", "0.3", "--sh", "0.7", "--dmd", "1000", "--solar", "4000"] + command: [ "python", "./participant.py", "--con", "172.20.0.101", "--cc", "0.2", "--sh", "0.8", "--dmd", "1000", "--solar", "4000"] part12: networks: network1: @@ -128,7 +128,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.105", "--cc", "0.3", "--sh", "0.7", "--dmd", "300", "--solar", "4000"] + command: [ "python", "./participant.py", "--con", "172.20.0.105", "--cc", "0.2", "--sh", "0.8", "--dmd", "300", "--solar", "4000"] part13: networks: network1: @@ -138,4 +138,4 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.106", "--cc", "0.8", "--sh", "0.2", "--dmd", "1000", "--solar", "4000"] \ No newline at end of file + command: [ "python", "./participant.py", "--con", "172.20.0.106", "--cc", "0.7", "--sh", "0.7", "--dmd", "800", "--solar", "4000"] \ No newline at end of file diff --git a/exchange.py b/exchange.py index 1ee3cb98636fc14803c6e604be4327a67f83281c..3a92a4cdc4b90024243f33465dd4ee074d481941 100644 --- a/exchange.py +++ b/exchange.py @@ -1,5 +1,4 @@ -# from main import ureg,Q_ -import locale +#from participant import ureg, Q_ import time from datetime import datetime, timedelta @@ -13,8 +12,8 @@ class Exchange: self.pubicKey: bytes = pk self.timestamp: int = time.time_ns() self.__isActive: bool = False - # self.__power = Q_(p, ureg.watt) - self.__pricePerWatt = locale.currency(ppw) + self.power = Q_(p, ureg.watt) + self.__pricePerWatt = ppw # this is in EUR, no new cryptocurrency should be created for this self.__trades: set[Trade] = set() self.__sig: bytes diff --git a/main.py b/main.py index 0b25a83ea589ae996703c7988cf72f66c37dfc29..4f65d7774206b80ec14dffcb7c14e33e43978d9a 100644 --- a/main.py +++ b/main.py @@ -26,6 +26,11 @@ import settings import participant import DHTdummy +import rustworkx as rx +import matplotlib.pyplot as plt +from rustworkx.visualization import mpl_draw + + ureg = UnitRegistry() Q_ = ureg.Quantity @@ -47,10 +52,50 @@ def get_ip(): return IP -# --------------- Examples ----------------------- +class GraphNode: + def __init__(self, value): + self.index = None + self.value = value + + def __str__(self): + return f"GraphNode: {self.value} @ index: {self.index}" + + +class GraphEdge: + def __init__(self, value): + self.index = None + self.value = value + + def __str__(self): + return f"EdgeNode: {self.value} @ index: {self.index}" +graph = rx.PyGraph() +graph.add_nodes_from([GraphNode(i) for i in range(8)]) +graph.add_edges_from([(i, i + 1, GraphEdge(1)) for i in range(7)]) +# Populate index attribute in GraphNode objects +for index in graph.node_indices(): + graph[index].index = index +# Populate index attribute in GraphEdge objects +for index, data in graph.edge_index_map().items(): + data[2].index = index +print("Nodes:") +for node in graph.nodes(): + print(node) +print("Edges:") +for edge in graph.edges(): + print(edge) +plt.clf() +mpl_draw(graph, with_labels=True) +length = rx.dijkstra_shortest_path_lengths(graph, 2, lambda e: e.value, 6) +print(length) + +plt.show() +# --------------- Examples ----------------------- + + +''' v1 = Q_(60, ureg.volt) a1 = Q_(4, ureg.amp) p1 = v1 * a1 @@ -58,9 +103,9 @@ p1.ito(ureg.watt) print(p1) t1 = 60 * ureg.second e1 = p1 * t1 -e1.ito(ureg.kilowatt_hour) +#e1.ito(ureg.kilowatt_hour) print(e1) - +''' ''' d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max), @@ -91,7 +136,7 @@ async def lifespan(app: FastAPI): scheduler.shutdown() -#fast_app = FastAPI(lifespan=lifespan) +# fast_app = FastAPI(lifespan=lifespan) # start the server ''' diff --git a/offer.py b/offer.py index 1550143cf71196540f1cdd5363b61e2fa5cfe789..8cd5d55c38fc6ad3daad6c92c77558b8e42df832 100644 --- a/offer.py +++ b/offer.py @@ -1,4 +1,6 @@ import exchange + + class Offer(exchange.Exchange): - def __init__(self,pk: bytes, p: float, ppw: float): - exchange.Exchange.__init__(self,pk, p, ppw) + def __init__(self, pk: bytes, p: float, ppw: float): + exchange.Exchange.__init__(self, pk, p, ppw) diff --git a/participant.py b/participant.py index ed7d5d36ae4ae581bd2f306f69d46bf5c42fe528..3b9a8ad2e8101bc9faaf9ae071702757fb4db0ef 100644 --- a/participant.py +++ b/participant.py @@ -1,4 +1,4 @@ -from pint import UnitRegistry +from pint import UnitRegistry, set_application_registry import locale import time @@ -45,6 +45,7 @@ import logging logger = logging.getLogger(__name__) ureg = UnitRegistry() Q_ = ureg.Quantity +set_application_registry(ureg) class Participant: @@ -66,13 +67,11 @@ class Participant: self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set) self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants self.remoteParticipants: set[RemoteParticipant] = set() - self.availableExchanges: set[ - Exchange] = set() # known available exchanges from other participants for next turns - self.ExchangeQueue: list[Request | Offer] = list() # own exchanges for future turns + self.ExchangeQueue: list[Exchange] = list() # own exchanges for future turns self.activeTrades: set[Trade] = set() # own active trades for this turn self.__tradeHistory: list[Trade] = [] # every own past trade self.solarWattage = solar_wattage = solar_wattage - self.__currentPower = Q_(0, ureg.watt) # real time power exchange with the grid + self.__availablePower = Q_(0, ureg.watt) # possible power exchange with the grid self.__energyOffset = Q_(0, ureg.watt * ureg.hour) # energy offset for the next round self.__currentInHouseDemand = Q_(init_demand, ureg.watt) # real time demand from household appliances, storage self.__pastInHouseDemand = list() @@ -84,6 +83,7 @@ class Participant: self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON, methods=["GET"]) self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteJSON, methods=["GET"]) + self.fastRouter.add_api_route("/getOffersJSON", self.getOffersJSON, methods=["GET"]) self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"]) self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"]) # announce self on dht @@ -101,16 +101,24 @@ class Participant: # set up scheduling for tasks self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1', name='requestRemoteParticipants') + self.scheduler.add_job(self.updateCurrentPower, 'interval', seconds=2, id='2', name='updateCurrentPower') logger.info("~~~>Finished setup on " + self.publicIP.__str__()) def as_remote_participant(self) -> RemoteParticipant: - return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges, + return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, aex=self.ExchangeQueue, adjlines=self.adjacentLines) async def asRemoteJSON(self): return jsonpickle.encode(self.as_remote_participant()) + async def getOffersJSON(self): + offers = [] + for ex in self.ExchangeQueue: + if isinstance(ex, Offer): + offers.append(ex) + return jsonpickle.encode(offers) + async def dht_startup(self) -> None: logger.info("~~~> dht_startup on " + self.publicIP.__str__()) for ip in self.dht_endpoints: # register participant on all dht endpoints @@ -136,11 +144,21 @@ class Participant: for new_rp in new_rps: for rp in target_rps: if new_rp == rp: # found same participant - rp.update_nextExchanges(new_rp.nextExchanges) + rp.update_nextExchanges(new_rp.availableExchanges) rp.timestamp = new_rp.timestamp else: # participant is new target_rps.add(new_rp) + async def calc_distance(self, gridnodeindex: int) -> float: + distdict = rx.dijkstra_shortest_path_lengths(self.grid, gridnodeindex, lambda l: l.loss, + self.gridNodeIndexes[self.publicKey]) + return distdict[self.gridNodeIndexes[self.publicKey]] + + async def add_distance_to_participants(self, pk: bytes, dist: float): + for rp in self.remoteParticipants: # this is bad but rps is a set now, maybe change to list + if rp.publicKey == pk: + rp.distance = dist # this is the distance ins loss from the grid + async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]): for rp in all_rps: if rp.publicKey not in self.gridNodeIndexes.keys(): # found new participant, add and connect it @@ -156,6 +174,10 @@ class Participant: if trafo.publicKey in self.gridNodeIndexes.keys(): i_edge = self.grid.add_edge(i, self.gridNodeIndexes[trafo.publicKey], rline) self.gridEdgeIndexes[rline.publicKey].add(i_edge) + # this is actually stupid, when the distance changes based on the load of the line + # we have to calculate this a when matching exchanges + # but as a first step to get an order in which to contact participants this is fine + await self.add_distance_to_participants(rp.publicKey, await self.calc_distance(i)) break async def request_remoteparticipants(self) -> None: @@ -251,14 +273,80 @@ class Participant: return HTMLResponse(content=html_content) async def produceNextTrade(self): - pass + + # here we can take into account if there is a variable consumer in the house (e.g. electric car or heat pump) + # and if a battery is available to take care of the fluctuations or to even supply energy to the grid + # when the price is right + + # this will currently only produce an exchange for the next minute, any market actions of creating future + # exchanges are not implemented and not in scope of this thesis + + # set projected energy to current energy, a better forecast would be nice + self.projectedInHouseSupply = self.currentInHouseSupply + self.__projectedInHouseDemand = self.__currentInHouseDemand + + power = self.projectedInHouseSupply - self.__projectedInHouseDemand + ex_next_min = False + + logger.info("===> checking if new exchange has to be created on " + self.publicIP.__str__()) + + for t in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute + if t.is_min_next(): + ex_next_min = True + break + + if power > Q_(0, ureg.watt) and not ex_next_min: + # more energy will be available than needed, so we can create an offer + o = Offer(self.publicKey, power, 0.3) # price should be dynamic, no time :D + self.ExchangeQueue.append(o) + logger.info("===> created offer on " + self.publicIP.__str__()) + else: + if power < Q_(0, ureg.watt) and not ex_next_min: + # more energy is needed than available, so we create a request + r = Request(self.publicKey, power, 0.3) + self.ExchangeQueue.append(r) + logger.info("===> created request on " + self.publicIP.__str__()) + + async def findOffer(self, discarded_rps=frozenset()): + closest = self.as_remote_participant() # this is just to have a rp to compare to + for rp in self.remoteParticipants: + if rp.distance < closest.distance and rp not in discarded_rps: + closest = rp + + if closest != self.as_remote_participant(): # check if there is actually someone + async with httpx.AsyncClient() as client: + response = await client.get("http://" + closest.publicIP.__str__() + ":8000/getOffersJSON") + offers = jsonpickle.decode(response.json()) + for o in offers: + if o.is_min_next(): # once again, only trying to find an offer for the next minute + # TODO: now check if power is sufficient, for this solve pint ureg issue + logger.info("===> found offer on " + closest.publicIP.__str__()) + + + + + + else: + logger.info("===> no one to trade with on " + self.publicIP.__str__()) + return + # when no partner can be found, this will be tried again after the next updateCurrentPower + # when a request is about to "time out" because no partner has been found, the required power of + # the participant should be reduced to avoid a blackout. in a fully digitalized grid the participant + # would be able to request power from the grid + # in an overlapping period the participant simply takes the power from the grid via the old counter async def updateCurrentPower(self): - self.__currentPower = self.currentInHouseSupply - self.__currentInHouseDemand + self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand + # simulate some random fluctuations self.__currentInHouseDemand = self.__currentInHouseDemand + Q_(uniform(-0.1, 0.1), ureg.watt) t = Q_(2, ureg.second) # this is run every 2 seconds - self.__energyOffset += self.__currentPower * t - # TODO: add scheduling for this and test it ;) + t.ito(ureg.hour) + self.__energyOffset += self.__availablePower * t + logger.info(">>>> available power: " + self.__availablePower.__str__() + " and energy offset " + + self.__energyOffset.__str__() + " on " + self.publicIP.__str__()) + await self.produceNextTrade() + if isinstance(self.ExchangeQueue[0], Request): # see if there is a request for power in front of the queue + await self.findOffer() class Weather: @@ -270,24 +358,28 @@ class Weather: self.participant = p self.scheduler = schd - self.scheduler.add_job(self.changeWeather, 'interval', seconds=60 * 5, id='2', name='changeWeather') - self.scheduler.add_job(self.setCurrentWeather, 'interval', seconds=10, id='3', name='getCurrentWeather') + run_date = datetime.now() + timedelta(hours=1, seconds=2) # +1 hour because timezones suck hard + self.scheduler.add_job(self.setCurrentWeather, 'date', run_date=run_date, id='100', ) + + self.scheduler.add_job(self.changeWeather, 'interval', seconds=300, id='101', name='changeWeather') + self.scheduler.add_job(self.setCurrentWeather, 'interval', seconds=10, id='102', name='setCurrentWeather') async def changeWeather(self): if self.cloudCoverage > 0.7: - self.cloudCoverage -= uniform(0.0, 0.2) + self.cloudCoverage -= uniform(0.0, 0.1) else: if self.cloudCoverage < 0.3: - self.cloudCoverage += uniform(0.0, 0.2) + self.cloudCoverage += uniform(0.0, 0.1) else: - self.cloudCoverage += uniform(-0.3, 0.3) + self.cloudCoverage += uniform(-0.2, 0.2) async def setCurrentWeather(self): - self.participant.currentInHouseSupply = Q_((self.sunshine + uniform(-0.05, 0.05) - self.cloudCoverage + - uniform(-0.05, 0.05)) * self.participant.solarWattage, ureg.watt) + self.participant.currentInHouseSupply = Q_(max(0.0, (self.sunshine + uniform(-0.05, 0.05) - self.cloudCoverage + + uniform(-0.05, 0.05))) * self.participant.solarWattage, + ureg.watt) logger.info(">>>> setting current weather on " + self.participant.publicIP.__str__() + "with sunshine" + - self.sunshine.__str__() + "and cloud coverage" + self.cloudCoverage.__str__() + "producing " + - self.participant.currentInHouseSupply.__str__(), "watt") + self.sunshine.__str__() + "and cloud coverage " + self.cloudCoverage.__str__() + "producing " + + self.participant.currentInHouseSupply.__str__() + " watt") if __name__ == "__main__": diff --git a/remoteparticipant.py b/remoteparticipant.py index 0437c1077f0af2af94a31f6f86de175c9097c5d0..52e6b10ebce36c33c02572748b0d120d3b409e7c 100644 --- a/remoteparticipant.py +++ b/remoteparticipant.py @@ -2,6 +2,7 @@ import ipaddress import time from datetime import datetime, timedelta +import numpy from pydantic.dataclasses import dataclass import exchange @@ -9,23 +10,22 @@ import remoteline class RemoteParticipant: - def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, nex: set[exchange.Exchange], + def __init__(self, pk: bytes, ip: ipaddress.IPv4Address, aex: list[exchange.Exchange], adjlines: set[remoteline.RemoteLine]): self.publicKey: bytes = pk self.publicIP: ipaddress.IPv4Address = ip - self.nextExchanges: set[exchange.Exchange] = nex + self.availableExchanges: list[exchange.Exchange] = aex self.adjacentLines: set[remoteline.RemoteLine] = adjlines self.timestamp = time.time_ns() + # a participant will calc the distance in electric loss to this participant when inserting + self.distance: float = numpy.iinfo(float).max - def update_nextExchanges(self, exchanges: set[exchange.Exchange]): - for ex in self.nextExchanges: # clear all old exchanges + def update_nextExchanges(self, exchanges: list[exchange.Exchange]): + for ex in self.availableExchanges: # clear all old exchanges if ex.is_old(): - self.nextExchanges.remove(ex) - - self.nextExchanges = self.nextExchanges.union(exchanges) - - + self.availableExchanges.remove(ex) + self.availableExchanges = self.availableExchanges.extend(exchanges) def is_min_now(self) -> bool: now = datetime.now()