diff --git a/docker-compose.yaml b/docker-compose.yaml index f5148257e3d8dcda653d841ddb540247eb135d7b..8ebdd355cbc6c37d3088768417bdd6c71fbf2d96 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -118,7 +118,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.101"] + command: [ "python", "./participant.py", "--con", "172.20.0.101", "--cc", "0.3", "--sh", "0.7", "--dmd", "1000", "--solar", "4000"] part12: networks: network1: @@ -128,7 +128,7 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.105"] + command: [ "python", "./participant.py", "--con", "172.20.0.105", "--cc", "0.3", "--sh", "0.7", "--dmd", "300", "--solar", "4000"] part13: networks: network1: @@ -138,4 +138,4 @@ services: build: context: . dockerfile: /home/jonny0815/git/dismagr/participant/dockerfile - command: [ "python", "./participant.py", "--con", "172.20.0.106"] \ No newline at end of file + command: [ "python", "./participant.py", "--con", "172.20.0.106", "--cc", "0.8", "--sh", "0.2", "--dmd", "1000", "--solar", "4000"] \ No newline at end of file diff --git a/main.py b/main.py index 6e3809c768ca6cb566409e7e331eab10db3801ad..0b25a83ea589ae996703c7988cf72f66c37dfc29 100644 --- a/main.py +++ b/main.py @@ -50,13 +50,18 @@ def get_ip(): # --------------- Examples ----------------------- -''' -v1 = Q_(5, ureg.volt) + +v1 = Q_(60, ureg.volt) a1 = Q_(4, ureg.amp) p1 = v1 * a1 p1.ito(ureg.watt) -#print(p1) +print(p1) +t1 = 60 * ureg.second +e1 = p1 * t1 +e1.ito(ureg.kilowatt_hour) +print(e1) +''' d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max), @@ -86,7 +91,7 @@ async def lifespan(app: FastAPI): scheduler.shutdown() -fast_app = FastAPI(lifespan=lifespan) +#fast_app = FastAPI(lifespan=lifespan) # start the server ''' diff --git a/participant.py b/participant.py index a34c944385478bfe040e06ec6d9629bd57f6d968..ed7d5d36ae4ae581bd2f306f69d46bf5c42fe528 100644 --- a/participant.py +++ b/participant.py @@ -1,11 +1,12 @@ -# from main import ureg, Q_ +from pint import UnitRegistry + import locale import time from contextlib import asynccontextmanager from datetime import datetime, timedelta from collections import defaultdict -from random import randint +from random import randint, uniform import ipaddress import asyncio import httpx @@ -42,11 +43,13 @@ from fastapi import APIRouter, FastAPI import logging logger = logging.getLogger(__name__) +ureg = UnitRegistry() +Q_ = ureg.Quantity class Participant: def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address, - scd: AsyncIOScheduler, ip: ipaddress = get_pub_ip()): + scd: AsyncIOScheduler, init_demand: float, solar_wattage: int, ip: ipaddress = get_pub_ip()): self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) self.__secretKey, self.publicKey = self.dil.keygen( [randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), @@ -59,20 +62,23 @@ class Participant: ip) # set to current ipv4, confirm before each new round self.adjacentLines: set[RemoteLine] = set() self.grid: rx.PyGraph = rx.PyGraph() - self.gridnodeindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph - self.gridedgeindexes: dict[bytes:set[int]] = defaultdict(set) + self.gridNodeIndexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph + self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set) self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants self.remoteParticipants: set[RemoteParticipant] = set() self.availableExchanges: set[ Exchange] = set() # known available exchanges from other participants for next turns - self.nextExchange: Request | Offer # own exchange for next turn, either offering or requesting energy + self.ExchangeQueue: list[Request | Offer] = list() # own exchanges for future turns self.activeTrades: set[Trade] = set() # own active trades for this turn self.__tradeHistory: list[Trade] = [] # every own past trade - # self.__currentPower = Q_(0, ureg.watt) # real time power exchange with the grid - # self.__currentInhouseDemand = Q_(0, ureg.watt) # real time demand from household appliances and storage - # self.__projectedInhouseDemand = Q_(0, ureg.watt) # expected demand for next round - # self.__currentInhouseSupply = Q_(0, ureg.watt) # real time inhouse production from solar, wind, storage, ... - # self.__projectedInhouseSupply = Q_(0, ureg.watt) # expected supply for next round + self.solarWattage = solar_wattage = solar_wattage + self.__currentPower = Q_(0, ureg.watt) # real time power exchange with the grid + self.__energyOffset = Q_(0, ureg.watt * ureg.hour) # energy offset for the next round + self.__currentInHouseDemand = Q_(init_demand, ureg.watt) # real time demand from household appliances, storage + self.__pastInHouseDemand = list() + self.__projectedInHouseDemand = Q_(init_demand, ureg.watt) # expected demand for next round + self.currentInHouseSupply = Q_(0, ureg.watt) # real time in house production from solar, wind, storage, ... + self.projectedInHouseSupply = Q_(0, ureg.watt) # expected supply for next round # register rest endpoints with fastapi self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON, @@ -90,13 +96,13 @@ class Participant: # discover the grid time.sleep(10) - + logger.info("~~~> starting discoverGrid") # participants will be added after each request_remote_participants asyncio.run(self.discoverGrid()) # set up scheduling for tasks self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1', name='requestRemoteParticipants') - logger.info("Finished setup on " + self.publicIP.__str__()) + logger.info("~~~>Finished setup on " + self.publicIP.__str__()) def as_remote_participant(self) -> RemoteParticipant: return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, nex=self.availableExchanges, @@ -106,7 +112,7 @@ class Participant: return jsonpickle.encode(self.as_remote_participant()) async def dht_startup(self) -> None: - logger.info("dht_startup on " + self.publicIP.__str__()) + logger.info("~~~> dht_startup on " + self.publicIP.__str__()) for ip in self.dht_endpoints: # register participant on all dht endpoints async with httpx.AsyncClient() as client: await client.put( @@ -136,25 +142,24 @@ class Participant: target_rps.add(new_rp) async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]): - logger.info("===> hello from insert into grid") for rp in all_rps: - if rp.publicKey not in self.gridnodeindexes.keys(): # found new participant, add and connect it + if rp.publicKey not in self.gridNodeIndexes.keys(): # found new participant, add and connect it logger.info("===> found unknown participant: " + rp.publicIP.__str__()) i = self.grid.add_node(rp) - self.gridnodeindexes[rp.publicKey] = i + self.gridNodeIndexes[rp.publicKey] = i for line in rp.adjacentLines: # should only be one line that connects the participant to the grid async with httpx.AsyncClient() as client: response_edge = await client.get("http://" + line.publicIP.__str__() + ":8000/asRemoteJSON") logger.info("===> insert_part: getting line from: " + line.publicIP.__str__()) rline: RemoteLine = jsonpickle.decode(response_edge.json()) for trafo in rline.adjacentConnections: # participant should be connected to a transformer - if trafo.publicKey in self.gridnodeindexes.keys(): - i_edge = self.grid.add_edge(i, self.gridnodeindexes[trafo.publicKey], rline) - self.gridedgeindexes[rline.publicKey].add(i_edge) + if trafo.publicKey in self.gridNodeIndexes.keys(): + i_edge = self.grid.add_edge(i, self.gridNodeIndexes[trafo.publicKey], rline) + self.gridEdgeIndexes[rline.publicKey].add(i_edge) break async def request_remoteparticipants(self) -> None: - logger.info("===> requesting remote participants on " + self.publicIP.__str__()) + logger.info("~~~> requesting remote participants on " + self.publicIP.__str__()) all_remoteparticipants: set[RemoteParticipant] = set() if len(self.remoteParticipants) == 0: # initial discovery via dht await self.dht_update_peers() @@ -200,33 +205,33 @@ class Participant: async def discoverGrid(self): # this is not robust i: int = self.grid.add_node(self.as_remote_participant()) - self.gridnodeindexes[self.publicKey] = i + self.gridNodeIndexes[self.publicKey] = i await self.discoverFromNode(self.publicKey) async def discoverFromNode(self, nodepublickey: bytes): - for lin in self.grid[self.gridnodeindexes[nodepublickey]].adjacentLines: # for lines connected to part/trafo + for lin in self.grid[self.gridNodeIndexes[nodepublickey]].adjacentLines: # for lines connected to part/trafo async with httpx.AsyncClient() as client: response_edge = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON") logger.info("===> getting line from: " + lin.publicIP.__str__()) rlin: RemoteLine = jsonpickle.decode(response_edge.json()) for con in rlin.adjacentConnections: - if con.publicKey not in self.gridnodeindexes.keys(): + if con.publicKey not in self.gridNodeIndexes.keys(): response_node = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteJSON") logger.info("===> getting con from: " + con.publicIP.__str__()) rcon = jsonpickle.decode(response_node.json()) i_node = self.grid.add_node(rcon) logger.info("===> created node for " + rcon.publicIP.__str__()) - self.gridnodeindexes[rcon.publicKey] = i_node - i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey], i_node, rlin) + self.gridNodeIndexes[rcon.publicKey] = i_node + i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey], i_node, rlin) logger.info("===> created edge") - self.gridedgeindexes[rlin.publicKey].add(i_edge) + self.gridEdgeIndexes[rlin.publicKey].add(i_edge) await self.discoverFromNode(rcon.publicKey) else: logger.info("===> already known node: " + con.publicIP.__str__()) - if rlin.publicKey not in self.gridedgeindexes.keys() and nodepublickey != con.publicKey: - i_edge = self.grid.add_edge(self.gridnodeindexes[nodepublickey], - self.gridnodeindexes[con.publicKey], rlin) - self.gridedgeindexes[rlin.publicKey].add(i_edge) + if rlin.publicKey not in self.gridEdgeIndexes.keys() and nodepublickey != con.publicKey: + i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey], + self.gridNodeIndexes[con.publicKey], rlin) + self.gridEdgeIndexes[rlin.publicKey].add(i_edge) logger.info("===> created edge") async def getGridPNG(self): @@ -248,10 +253,50 @@ class Participant: async def produceNextTrade(self): pass + async def updateCurrentPower(self): + self.__currentPower = self.currentInHouseSupply - self.__currentInHouseDemand + self.__currentInHouseDemand = self.__currentInHouseDemand + Q_(uniform(-0.1, 0.1), ureg.watt) + t = Q_(2, ureg.second) # this is run every 2 seconds + self.__energyOffset += self.__currentPower * t + # TODO: add scheduling for this and test it ;) + + +class Weather: + # Weather might be better done using this but time is running out + # https://pvlib-python.readthedocs.io/en/v0.6.0/forecasts.html + def __init__(self, p: Participant, cc: float, sh: float, schd: AsyncIOScheduler): + self.cloudCoverage = cc + self.sunshine = sh + self.participant = p + self.scheduler = schd + + self.scheduler.add_job(self.changeWeather, 'interval', seconds=60 * 5, id='2', name='changeWeather') + self.scheduler.add_job(self.setCurrentWeather, 'interval', seconds=10, id='3', name='getCurrentWeather') + + async def changeWeather(self): + if self.cloudCoverage > 0.7: + self.cloudCoverage -= uniform(0.0, 0.2) + else: + if self.cloudCoverage < 0.3: + self.cloudCoverage += uniform(0.0, 0.2) + else: + self.cloudCoverage += uniform(-0.3, 0.3) + + async def setCurrentWeather(self): + self.participant.currentInHouseSupply = Q_((self.sunshine + uniform(-0.05, 0.05) - self.cloudCoverage + + uniform(-0.05, 0.05)) * self.participant.solarWattage, ureg.watt) + logger.info(">>>> setting current weather on " + self.participant.publicIP.__str__() + "with sunshine" + + self.sunshine.__str__() + "and cloud coverage" + self.cloudCoverage.__str__() + "producing " + + self.participant.currentInHouseSupply.__str__(), "watt") + if __name__ == "__main__": parser = argparse.ArgumentParser(description='Participant in the Grid') parser.add_argument('--con', type=str, required=True, help='the cable that the participant is connected to') + parser.add_argument('--cc', type=float, required=True, help='initial cloud coverage') + parser.add_argument('--sh', type=float, required=True, help='initial sunshine') + parser.add_argument('--dmd', type=float, required=True, help='initial demand') + parser.add_argument('--solar', type=int, required=True, help='solar wattage') args = parser.parse_args() jobstores = { @@ -275,6 +320,7 @@ if __name__ == "__main__": fast_app = FastAPI(lifespan=lifespan) part = Participant(nid="test_network", dht_ep={ipaddress.IPv4Address('172.20.0.2')}, scd=scheduler, - con_ip=ipaddress.IPv4Address(args.con)) + init_demand=args.dmd, solar_wattage=args.solar, con_ip=ipaddress.IPv4Address(args.con)) fast_app.include_router(part.fastRouter) + w = Weather(part, args.cc, args.sh, scheduler) uvicorn.run(fast_app, host=get_pub_ip(), port=8000) diff --git a/trade.py b/trade.py index 6c0551ad787050f69ef0796b3de04d7988a0ec5a..1d5d2e15105f9023f055a915b3f3eb9a265fee68 100644 --- a/trade.py +++ b/trade.py @@ -1,6 +1,7 @@ # from main import ureg,Q_ import locale import time +import datetime import offer import request @@ -8,12 +9,14 @@ import route class Trade: - def __init__(self, off: offer.Offer, req: request.Request, pwr: float, ppw: float, rou: route.Route): + def __init__(self, off: offer.Offer, req: request.Request, pwr: float, ppw: float, rou: route.Route, + extime: datetime.datetime): self.__timestamp = time.time_ns() self.__offer: offer.Offer = off self.__request: request.Request = req self.__route = rou # self.__power = Q_(pwr, ureg.watt) self.__pricePerWatt = locale.currency(ppw) + self.__executionTime: datetime.datetime = extime self.sig_off: bytes self.sig_req: bytes