Newer
Older
from pint import UnitRegistry, set_application_registry

Johannes Hitzinger
committed
from contextlib import asynccontextmanager

Johannes Hitzinger
committed
from random import randint, uniform
import asyncio
import httpx
from rustworkx.visualization import mpl_draw
from rustworkx.visit import DijkstraVisitor
import base64
from io import BytesIO
from fastapi.responses import HTMLResponse
import matplotlib.pyplot as plt

Johannes Hitzinger
committed
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler

Johannes Hitzinger
committed
from dilithium import Dilithium
import dilithium.dilithium
from util import get_pub_ip, next_minute
from offer import Offer
from request import Request
from trade import Trade
from remoteconnection import RemoteConnection
from remoteline import RemoteLine
from remotetransformer import RemoteTransformer

Johannes Hitzinger
committed
from remoteparticipant import RemoteParticipant
import logging
logger = logging.getLogger(__name__)

Johannes Hitzinger
committed
ureg = UnitRegistry()
Q_ = ureg.Quantity
set_application_registry(ureg)

Johannes Hitzinger
committed
class Participant:
def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address,

Johannes Hitzinger
committed
scd: AsyncIOScheduler, init_demand: float, solar_wattage: int, ip: ipaddress = get_pub_ip()):
self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
self.__secretKey, self.publicKey = self.dil.keygen(
[randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max),
randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)])

Johannes Hitzinger
committed
self.fastRouter = APIRouter()
self.dht_endpoints: set[ipaddress.IPv4Address] = dht_ep
self.__dht_network_id = nid
self.publicIP: ipaddress.IPv4Address = ipaddress.IPv4Address(
ip) # set to current ipv4, confirm before each new round
self.grid: rx.PyGraph = rx.PyGraph()

Johannes Hitzinger
committed
self.gridNodeIndexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph
self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set)
self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants
self.remoteParticipants: set[RemoteParticipant] = set()
self.ExchangeQueue: list[Exchange] = list() # own exchanges for future turns

Johannes Hitzinger
committed
self.activeTrades: set[Trade] = set() # own active trades for this turn
self.__tradeHistory: list[Trade] = [] # every own past trade

Johannes Hitzinger
committed
self.solarWattage = solar_wattage = solar_wattage
self.__availablePower = Q_(0, ureg.watt) # possible power exchange with the grid

Johannes Hitzinger
committed
self.__energyOffset = Q_(0, ureg.watt * ureg.hour) # energy offset for the next round
self.__currentInHouseDemand = Q_(init_demand, ureg.watt) # real time demand from household appliances, storage
self.__pastInHouseDemand = list()
self.__projectedInHouseDemand = Q_(init_demand, ureg.watt) # expected demand for next round
self.currentInHouseSupply = Q_(0, ureg.watt) # real time in house production from solar, wind, storage, ...
self.projectedInHouseSupply = Q_(0, ureg.watt) # expected supply for next round
# register rest endpoints with fastapi
self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON,
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteJSON, methods=["GET"])
self.fastRouter.add_api_route("/getOffersJSON", self.getOffersJSON, methods=["GET"])
self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"])
self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"])
# announce self on dht
asyncio.run(self.dht_startup())
# self.loop.run_in_executor(None, self.dht_startup)
# get potential peers from dht
asyncio.run(self.dht_update_peers())
# retrieve information from my grid connection
asyncio.run(self.buildOwnLine(con_ip))
# discover the grid
time.sleep(10)

Johannes Hitzinger
committed
logger.info("~~~> starting discoverGrid") # participants will be added after each request_remote_participants
asyncio.run(self.discoverGrid())
# set up scheduling for tasks
self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1',
name='requestRemoteParticipants')
self.scheduler.add_job(self.updateCurrentPower, 'interval', seconds=5, id='2', name='updateCurrentPower')

Johannes Hitzinger
committed
logger.info("~~~>Finished setup on " + self.publicIP.__str__())

Johannes Hitzinger
committed
def as_remote_participant(self) -> RemoteParticipant:
return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, aex=self.ExchangeQueue,
adjlines=self.adjacentLines)
async def asRemoteJSON(self):
return jsonpickle.encode(self.as_remote_participant())
async def getOffersJSON(self):
offers = []
for ex in self.ExchangeQueue:
if isinstance(ex, Offer):
offers.append(ex)
return jsonpickle.encode(offers)
async def dht_startup(self) -> None:

Johannes Hitzinger
committed
logger.info("~~~> dht_startup on " + self.publicIP.__str__())
for ip in self.dht_endpoints: # register participant on all dht endpoints
async with httpx.AsyncClient() as client:
await client.put(
"http://" + ip.__str__() + ":8000/addPeerTo/" + self.__dht_network_id + "?ip=" + self.publicIP.__str__())
async def dht_update_peers(self) -> None:
for ip in self.dht_endpoints:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/getPeersFor/" + self.__dht_network_id)
dht_ips = jsonpickle.decode(response.json())
self.knownIPs.update(dht_ips)
# fastapi call
async def allRemoteParticipantsJSON(self): # all known participants including self

Johannes Hitzinger
committed
all_remoteparticipants: set[RemoteParticipant] = set()
all_remoteparticipants.add(self.as_remote_participant())
all_remoteparticipants.update(self.remoteParticipants)

Johannes Hitzinger
committed
return jsonpickle.encode(all_remoteparticipants)
def update_remoteparticipants(self, new_rps: set[RemoteParticipant]):
if new_rp == self.as_remote_participant(): # found self
continue
if new_rp in self.remoteParticipants: # found same participant
for rp in self.remoteParticipants:
if rp == new_rp:
rp.update_nextExchanges(new_rp.availableExchanges)
rp.timestamp = new_rp.timestamp
else: # participant is new
self.remoteParticipants.add(new_rp)
async def calc_distance(self, gridnodeindex: int) -> float:
distdict = rx.dijkstra_shortest_path_lengths(self.grid, gridnodeindex, lambda l: l.loss,
self.gridNodeIndexes[self.publicKey])
return distdict[self.gridNodeIndexes[self.publicKey]]
async def add_distance_to_participants(self, pk: bytes, dist: float):
for rp in self.remoteParticipants: # this is bad but rps is a set now, maybe change to list
if rp.publicKey == pk:
rp.distance = dist # this is the distance ins loss from the grid
async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]):
for rp in all_rps:

Johannes Hitzinger
committed
if rp.publicKey not in self.gridNodeIndexes.keys(): # found new participant, add and connect it
logger.info("===> found unknown participant: " + rp.publicIP.__str__())
i = self.grid.add_node(rp)

Johannes Hitzinger
committed
self.gridNodeIndexes[rp.publicKey] = i
for line in rp.adjacentLines: # should only be one line that connects the participant to the grid
async with httpx.AsyncClient() as client:
response_edge = await client.get("http://" + line.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> insert_part: getting line from: " + line.publicIP.__str__())
rline: RemoteLine = jsonpickle.decode(response_edge.json())
for trafo in rline.adjacentConnections: # participant should be connected to a transformer

Johannes Hitzinger
committed
if trafo.publicKey in self.gridNodeIndexes.keys():
i_edge = self.grid.add_edge(i, self.gridNodeIndexes[trafo.publicKey], rline)
self.gridEdgeIndexes[rline.publicKey].add(i_edge)
# this is actually stupid, when the distance changes based on the load of the line
# we have to calculate this a when matching exchanges
# but as a first step to get an order in which to contact to participants, this is fine
await self.add_distance_to_participants(rp.publicKey, await self.calc_distance(i))
break
async def request_remoteparticipants(self) -> None:

Johannes Hitzinger
committed
logger.info("~~~> requesting remote participants on " + self.publicIP.__str__())

Johannes Hitzinger
committed
all_remoteparticipants: set[RemoteParticipant] = set()
if len(self.remoteParticipants) == 0: # initial discovery via dht
await self.dht_update_peers()
for ip in self.knownIPs:
try:
url = 'http://' + ip.__str__() + ':8000/allRemoteParticipantsJSON'
async with httpx.AsyncClient() as client:
response = await client.get(url)
remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
# some validation of remoteparticipants should happen, not in scope of this thesis
all_remoteparticipants.update(remoteparticipants)
except httpx.HTTPError as err:
logger.error("httpx Error: " + err.__str__())
continue
else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users
for rp in self.remoteParticipants:
try:
url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON'
async with httpx.AsyncClient() as client:
response = await client.get(url)
remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
# some validation of remoteparticipants should happen, not in scope of this thesis
all_remoteparticipants.update(remoteparticipants)
except httpx.HTTPError as err:
logger.error("httpx Error: " + err.__str__())
continue
self.update_remoteparticipants(all_remoteparticipants)
await self.insert_participants_into_grid(all_remoteparticipants)

Johannes Hitzinger
committed
async def get_knownIPs(self):
return jsonpickle.encode(self.knownIPs)
async def buildOwnLine(self, con_ip: ipaddress.IPv4Address):
await asyncio.sleep(1) # sleep to make sure that the connection is already up
async with httpx.AsyncClient() as client:
response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteJSON")
line = jsonpickle.decode(response.json())
self.adjacentLines.add(line)
logger.info(
"===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + line.publicIP.__str__())
async def discoverGrid(self): # this is not robust
i: int = self.grid.add_node(self.as_remote_participant())

Johannes Hitzinger
committed
self.gridNodeIndexes[self.publicKey] = i
async def discoverFromNode(self, nodepublickey: bytes):

Johannes Hitzinger
committed
for lin in self.grid[self.gridNodeIndexes[nodepublickey]].adjacentLines: # for lines connected to part/trafo
async with httpx.AsyncClient() as client:
response_edge = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> getting line from: " + lin.publicIP.__str__())
rlin: RemoteLine = jsonpickle.decode(response_edge.json())

Johannes Hitzinger
committed
if con.publicKey not in self.gridNodeIndexes.keys():
response_node = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteJSON")
logger.info("===> getting con from: " + con.publicIP.__str__())
rcon = jsonpickle.decode(response_node.json())
i_node = self.grid.add_node(rcon)
logger.info("===> created node for " + rcon.publicIP.__str__())

Johannes Hitzinger
committed
self.gridNodeIndexes[rcon.publicKey] = i_node
i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey], i_node, rlin)
logger.info("===> created edge")

Johannes Hitzinger
committed
self.gridEdgeIndexes[rlin.publicKey].add(i_edge)
await self.discoverFromNode(rcon.publicKey)
else:
logger.info("===> already known node: " + con.publicIP.__str__())

Johannes Hitzinger
committed
if rlin.publicKey not in self.gridEdgeIndexes.keys() and nodepublickey != con.publicKey:
i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey],
self.gridNodeIndexes[con.publicKey], rlin)
self.gridEdgeIndexes[rlin.publicKey].add(i_edge)
logger.info("===> created edge")
async def getGridPNG(self):
plt.clf()
img = mpl_draw(self.grid, with_labels=True, edge_labels=str, labels=str)
buf = BytesIO()
img.savefig(buf, format='png')
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode('utf-8')
html_content = f"""
<html>
<body>
<img src="data:image/png;base64,{img_base64}" />
</body>
</html>
"""
return HTMLResponse(content=html_content)
async def produceNextTrade(self):
# here we can take into account if there is a variable consumer in the house (e.g. electric car or heat pump)
# and if a battery is available to take care of the fluctuations or to even supply energy to the grid
# when the price is right
# this will currently only produce an exchange for the next minute, any market actions of creating future
# exchanges are not implemented and not in scope of this thesis
# set projected energy to current energy, a better forecast would be nice
self.projectedInHouseSupply = self.currentInHouseSupply
self.__projectedInHouseDemand = self.__currentInHouseDemand
power = self.projectedInHouseSupply - self.__projectedInHouseDemand
ex_next_min = False
logger.info("===> checking if new exchange has to be created on " + self.publicIP.__str__())
for t in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute
if t.is_min_next():
ex_next_min = True
if power > Q_(0, ureg.watt) and not ex_next_min:
# more energy will be available than needed, so we can create an offer
# for now all offers will be at the same price, in a real world scenario this would be dynamic
# additionally, all offers will be for the next minute, when the system is able to take variable
# consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request
# bigger amounts of energy in a foreseeable fashion
o = Offer(self.publicKey, power, 0.3, next_minute()) # price should be dynamic, no time :D
self.ExchangeQueue.append(o)
logger.info("===> created offer on " + self.publicIP.__str__())
else:
if power < Q_(0, ureg.watt) and not ex_next_min:
# more energy is needed than available, so we create a request
# for now all requests will be at the same price, in a real world scenario this would be dynamic
# additionally, all requests will be for the next minute, when the system is able to take variable
# consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request
# bigger amounts of energy in a foreseeable fashion
r = Request(self.publicKey, power, 0.3, next_minute())
self.ExchangeQueue.append(r)
logger.info("===> created request on " + self.publicIP.__str__())
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
async def findRoute(self, start: RemoteParticipant, end: RemoteParticipant) -> Route:
s = self.gridNodeIndexes[start.publicKey]
e = self.gridNodeIndexes[end.publicKey]
path = rx.dijkstra_shortest_paths(self.grid, s, e, lambda l: l.loss)
routeloss = rx.dijkstra_shortest_path_lengths(self.grid, s, lambda l: l.loss, e)
graph = self.grid.subgraph(path[e])
return Route(start, end, graph, routeloss)
async def announceRoute(self, t: Trade):
class GridVisitor(DijkstraVisitor):
def __init__(self, trade: Trade):
super().__init__()
self.examined_edges = set()
self.trade: Trade = trade
def discover_vertex(self, vertex, distance):
logger.info(f"Discovered Trafo: {vertex}")
def examine_edge(self, edge):
for e in self.examined_edges:
if edge[0] == e[1] and edge[1] == e[0]:
logger.info(f"Line already examined: {edge}")
return
logger.info(f"Examining Line: {edge}")
self.examined_edges.add(edge)
visitor = GridVisitor(t)
# walk the route and announce the route to all grid elements involved
rx.dijkstra_search(t.route.graph, [self.gridNodeIndexes[t.route.start.publicKey]], lambda _: 1.0, visitor)
async def findOffer(self, discarded_rps=frozenset()):
closest = self.as_remote_participant() # this is just to have a rp to compare to
for rp in self.remoteParticipants:
if rp.distance < closest.distance and rp not in discarded_rps:
logger.info("===> found closer participant: " + rp.publicIP.__str__())
closest = rp
if closest != self.as_remote_participant(): # check if there is actually someone
async with httpx.AsyncClient() as client:
response = await client.get("http://" + closest.publicIP.__str__() + ":8000/getOffersJSON")
offers: list[Offer] = jsonpickle.decode(response.json())
o.set_ureg(ureg) # update the ureg of the offer for pint to work
if o.is_min_next(): # once again, only trying to find an offer for the next minute
if o.power >= self.ExchangeQueue[0].power:
# the offer is big enough to fulfill the request
# now find the route to the offering participant
r = await self.findRoute(self.as_remote_participant(), closest)
logger.info("===> found route to " + closest.publicIP.__str__())
# create a trade, include offer and request, use power from request since we ensured that
# the offer is big enough, use price from offer as this is the price that will be paid
# and again, only trades for the next minute are possible at this point
t = Trade(o, self.ExchangeQueue[0], self.ExchangeQueue[0].power, o.pricePerWatt, r,
next_minute())
t.set_ureg(ureg)
# now announce the route to all connections involved
await self.announceRoute(t)
logger.info("===> found offer on " + closest.publicIP.__str__())
else:
logger.info("===> no one to trade with on " + self.publicIP.__str__())
return
# when no partner can be found, this will be tried again after the next updateCurrentPower
# when a request is about to "time out" because no partner has been found, the required power of
# the participant should be reduced to avoid a blackout. in a fully digitalized grid the participant
# would be able to request power from the grid
# in an overlapping period the participant simply takes the power from the grid via the old counter

Johannes Hitzinger
committed
async def updateCurrentPower(self):
self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand
# simulate some random fluctuations

Johannes Hitzinger
committed
self.__currentInHouseDemand = self.__currentInHouseDemand + Q_(uniform(-0.1, 0.1), ureg.watt)
t = Q_(5, ureg.second) # this is run every 5 seconds
t.ito(ureg.hour)
self.__energyOffset += self.__availablePower * t
logger.info(">>>> available power: " + self.__availablePower.__str__() + " and energy offset " +
self.__energyOffset.__str__() + " on " + self.publicIP.__str__())
await self.produceNextTrade()
if len(self.ExchangeQueue) > 0:
if isinstance(self.ExchangeQueue[0], Request): # see if there is a request for power in front of the queue
logger.info("===> trying to find an offer on " + self.publicIP.__str__())
await self.findOffer()

Johannes Hitzinger
committed
class Weather:
# Weather might be better done using this but time is running out
# https://pvlib-python.readthedocs.io/en/v0.6.0/forecasts.html
def __init__(self, p: Participant, cc: float, sh: float, schd: AsyncIOScheduler):
self.cloudCoverage = cc
self.sunshine = sh
self.participant = p
self.scheduler = schd
run_date = datetime.now() + timedelta(hours=1, seconds=1) # +1 hour because timezones suck hard
self.scheduler.add_job(self.setCurrentWeather, 'date', run_date=run_date, id='100', )
self.scheduler.add_job(self.changeWeather, 'interval', seconds=300, id='101', name='changeWeather')
self.scheduler.add_job(self.setCurrentWeather, 'interval', seconds=10, id='102', name='setCurrentWeather')

Johannes Hitzinger
committed
async def changeWeather(self):
if self.cloudCoverage > 0.7:
self.cloudCoverage -= uniform(0.0, 0.1)

Johannes Hitzinger
committed
else:
if self.cloudCoverage < 0.3:
self.cloudCoverage += uniform(0.0, 0.1)

Johannes Hitzinger
committed
else:
self.cloudCoverage += uniform(-0.2, 0.2)

Johannes Hitzinger
committed
async def setCurrentWeather(self):
self.participant.currentInHouseSupply = Q_(max(0.0, (self.sunshine + uniform(-0.05, 0.05) - self.cloudCoverage +
uniform(-0.05, 0.05))) * self.participant.solarWattage,
ureg.watt)

Johannes Hitzinger
committed
logger.info(">>>> setting current weather on " + self.participant.publicIP.__str__() + "with sunshine" +
self.sunshine.__str__() + "and cloud coverage " + self.cloudCoverage.__str__() + "producing " +
self.participant.currentInHouseSupply.__str__() + " watt")

Johannes Hitzinger
committed
parser = argparse.ArgumentParser(description='Participant in the Grid')
parser.add_argument('--con', type=str, required=True, help='the cable that the participant is connected to')

Johannes Hitzinger
committed
parser.add_argument('--cc', type=float, required=True, help='initial cloud coverage')
parser.add_argument('--sh', type=float, required=True, help='initial sunshine')
parser.add_argument('--dmd', type=float, required=True, help='initial demand')
parser.add_argument('--solar', type=int, required=True, help='solar wattage')
args = parser.parse_args()
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
part = Participant(nid="test_network", dht_ep={ipaddress.IPv4Address('172.20.0.2')}, scd=scheduler,

Johannes Hitzinger
committed
init_demand=args.dmd, solar_wattage=args.solar, con_ip=ipaddress.IPv4Address(args.con))
fast_app.include_router(part.fastRouter)

Johannes Hitzinger
committed
w = Weather(part, args.cc, args.sh, scheduler)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)