Skip to content
Snippets Groups Projects
participant.py 25.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • from pint import UnitRegistry, set_application_registry
    
    import locale
    
    from contextlib import asynccontextmanager
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
    from datetime import datetime, timedelta
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
    from collections import defaultdict
    
    import ipaddress
    
    import asyncio
    import httpx
    
    import rustworkx as rx
    
    from rustworkx.visualization import mpl_draw
    
    from rustworkx.visit import DijkstraVisitor
    
    import base64
    from io import BytesIO
    from fastapi.responses import HTMLResponse
    import matplotlib.pyplot as plt
    
    import uvicorn
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    
    from dilithium import Dilithium
    import dilithium.dilithium
    
    from util import get_pub_ip, next_minute
    
    from route import Route
    
    from offer import Offer
    from request import Request
    
    from exchange import Exchange
    
    from remoteconnection import RemoteConnection
    
    from remoteline import RemoteLine
    from remotetransformer import RemoteTransformer
    
    from fastapi import APIRouter, FastAPI
    
    import logging
    
    logger = logging.getLogger(__name__)
    
    set_application_registry(ureg)
    
        def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address,
    
                     scd: AsyncIOScheduler, init_demand: float, solar_wattage: int, ip: ipaddress = get_pub_ip()):
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            self.__secretKey, self.publicKey = self.dil.keygen(
    
                [randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max),
                 randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)])
    
            self.scheduler = scd
    
            self.dht_endpoints: set[ipaddress.IPv4Address] = dht_ep
            self.__dht_network_id = nid
            self.publicIP: ipaddress.IPv4Address = ipaddress.IPv4Address(
                ip)  # set to current ipv4, confirm before each new round
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            self.adjacentLines: set[RemoteLine] = set()
    
            self.grid: rx.PyGraph = rx.PyGraph()
    
            self.gridNodeIndexes: dict[bytes:int] = dict()  # mapping of public keys to indexes in the grid Graph
            self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set)
    
            self.knownIPs: set[ipaddress.IPv4Address] = set()  # known IPs from other participants
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            self.remoteParticipants: set[RemoteParticipant] = set()
    
            self.ExchangeQueue: list[Exchange] = list()  # own exchanges for future turns
    
            self.activeTrades: set[Trade] = set()  # own active trades for this turn
            self.__tradeHistory: list[Trade] = []  # every own past trade
    
            self.solarWattage = solar_wattage = solar_wattage
    
            self.__availablePower = Q_(0, ureg.watt)  # possible power exchange with the grid
    
            self.__energyOffset = Q_(0, ureg.watt * ureg.hour)  # energy offset for the next round
            self.__currentInHouseDemand = Q_(init_demand, ureg.watt)  # real time demand from household appliances, storage
            self.__pastInHouseDemand = list()
            self.__projectedInHouseDemand = Q_(init_demand, ureg.watt)  # expected demand for next round
            self.currentInHouseSupply = Q_(0, ureg.watt)  # real time in house production from solar, wind, storage, ...
            self.projectedInHouseSupply = Q_(0, ureg.watt)  # expected supply for next round
    
    
            # register rest endpoints with fastapi
    
            self.fastRouter.add_api_route("/allRemoteParticipantsJSON", self.allRemoteParticipantsJSON,
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
                                          methods=["GET"])
    
            self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteJSON, methods=["GET"])
    
            self.fastRouter.add_api_route("/getOffersJSON", self.getOffersJSON, methods=["GET"])
    
            self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"])
    
            self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"])
    
            # announce self on dht
            asyncio.run(self.dht_startup())
            # self.loop.run_in_executor(None, self.dht_startup)
            # get potential peers from dht
            asyncio.run(self.dht_update_peers())
    
            # retrieve information from my grid connection
    
            asyncio.run(self.buildOwnLine(con_ip))
    
            logger.info("~~~> starting discoverGrid")  # participants will be added after each request_remote_participants
    
            # set up scheduling for tasks
    
            self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1',
    
                                   name='requestRemoteParticipants')
    
            self.scheduler.add_job(self.updateCurrentPower, 'interval', seconds=5, id='2', name='updateCurrentPower')
    
            logger.info("~~~>Finished setup on " + self.publicIP.__str__())
    
        def as_remote_participant(self) -> RemoteParticipant:
    
            return RemoteParticipant(pk=self.publicKey, ip=self.publicIP, aex=self.ExchangeQueue,
    
                                     adjlines=self.adjacentLines)
    
        async def asRemoteJSON(self):
            return jsonpickle.encode(self.as_remote_participant())
    
        async def getOffersJSON(self):
            offers = []
            for ex in self.ExchangeQueue:
                if isinstance(ex, Offer):
                    offers.append(ex)
            return jsonpickle.encode(offers)
    
    
        async def dht_startup(self) -> None:
    
            logger.info("~~~> dht_startup on " + self.publicIP.__str__())
    
            for ip in self.dht_endpoints:  # register participant on all dht endpoints
                async with httpx.AsyncClient() as client:
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
                    await client.put(
                        "http://" + ip.__str__() + ":8000/addPeerTo/" + self.__dht_network_id + "?ip=" + self.publicIP.__str__())
    
    
        async def dht_update_peers(self) -> None:
            for ip in self.dht_endpoints:
                async with httpx.AsyncClient() as client:
                    response = await client.get("http://" + ip.__str__() + ":8000/getPeersFor/" + self.__dht_network_id)
                    dht_ips = jsonpickle.decode(response.json())
                    self.knownIPs.update(dht_ips)
    
        # fastapi call
    
        async def allRemoteParticipantsJSON(self):  # all known participants including self
    
            all_remoteparticipants: set[RemoteParticipant] = set()
    
            all_remoteparticipants.add(self.as_remote_participant())
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            all_remoteparticipants.update(self.remoteParticipants)
    
            return jsonpickle.encode(all_remoteparticipants)
    
    
        def update_remoteparticipants(self, new_rps: set[RemoteParticipant]):
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            for new_rp in new_rps:
    
                if new_rp == self.as_remote_participant():  # found self
                    continue
                if new_rp in self.remoteParticipants:  # found same participant
                    for rp in self.remoteParticipants:
                        if rp == new_rp:
                            rp.update_nextExchanges(new_rp.availableExchanges)
                            rp.timestamp = new_rp.timestamp
                else:  # participant is new
                    self.remoteParticipants.add(new_rp)
    
        async def calc_distance(self, gridnodeindex: int) -> float:
            distdict = rx.dijkstra_shortest_path_lengths(self.grid, gridnodeindex, lambda l: l.loss,
                                                         self.gridNodeIndexes[self.publicKey])
            return distdict[self.gridNodeIndexes[self.publicKey]]
    
        async def add_distance_to_participants(self, pk: bytes, dist: float):
            for rp in self.remoteParticipants:  # this is bad but rps is a set now, maybe change to list
                if rp.publicKey == pk:
                    rp.distance = dist  # this is the distance ins loss from the grid
    
    
        async def insert_participants_into_grid(self, all_rps: set[RemoteParticipant]):
            for rp in all_rps:
    
                if rp.publicKey not in self.gridNodeIndexes.keys():  # found new participant, add and connect it
    
                    logger.info("===> found unknown participant: " + rp.publicIP.__str__())
                    i = self.grid.add_node(rp)
    
                    for line in rp.adjacentLines:  # should only be one line that connects the participant to the grid
                        async with httpx.AsyncClient() as client:
                            response_edge = await client.get("http://" + line.publicIP.__str__() + ":8000/asRemoteJSON")
                            logger.info("===> insert_part: getting line from: " + line.publicIP.__str__())
                            rline: RemoteLine = jsonpickle.decode(response_edge.json())
                            for trafo in rline.adjacentConnections:  # participant should be connected to a transformer
    
                                if trafo.publicKey in self.gridNodeIndexes.keys():
                                    i_edge = self.grid.add_edge(i, self.gridNodeIndexes[trafo.publicKey], rline)
                                    self.gridEdgeIndexes[rline.publicKey].add(i_edge)
    
                                    # this is actually stupid, when the distance changes based on the load of the line
                                    # we have to calculate this a when matching exchanges
    
                                    # but as a first step to get an order in which to contact to participants, this is fine
    
                                    await self.add_distance_to_participants(rp.publicKey, await self.calc_distance(i))
    
        async def request_remoteparticipants(self) -> None:
    
            logger.info("~~~> requesting remote participants on " + self.publicIP.__str__())
    
            all_remoteparticipants: set[RemoteParticipant] = set()
    
            if len(self.remoteParticipants) == 0:  # initial discovery via dht
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
                await self.dht_update_peers()
                for ip in self.knownIPs:
                    try:
    
                        url = 'http://' + ip.__str__() + ':8000/allRemoteParticipantsJSON'
                        async with httpx.AsyncClient() as client:
                            response = await client.get(url)
                            remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
                            # some validation of remoteparticipants should happen, not in scope of this thesis
                            all_remoteparticipants.update(remoteparticipants)
                    except httpx.HTTPError as err:
                        logger.error("httpx Error: " + err.__str__())
                        continue
            else:  # continued peer updates without dht, periodic dht discovery should still happen to discover new users
                for rp in self.remoteParticipants:
                    try:
                        url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON'
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
                        async with httpx.AsyncClient() as client:
                            response = await client.get(url)
                            remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
                            # some validation of remoteparticipants should happen, not in scope of this thesis
                            all_remoteparticipants.update(remoteparticipants)
                    except httpx.HTTPError as err:
                        logger.error("httpx Error: " + err.__str__())
                        continue
    
    
            self.update_remoteparticipants(all_remoteparticipants)
    
            await self.insert_participants_into_grid(all_remoteparticipants)
    
        async def get_knownIPs(self):
            return jsonpickle.encode(self.knownIPs)
    
        async def buildOwnLine(self, con_ip: ipaddress.IPv4Address):
            await asyncio.sleep(1)  # sleep to make sure that the connection is already up
    
            async with httpx.AsyncClient() as client:
    
                response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteJSON")
    
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            line = jsonpickle.decode(response.json())
            self.adjacentLines.add(line)
            logger.info(
                "===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + line.publicIP.__str__())
    
    
        async def discoverGrid(self):  # this is not robust
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            i: int = self.grid.add_node(self.as_remote_participant())
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
            await self.discoverFromNode(self.publicKey)
    
    
        async def discoverFromNode(self, nodepublickey: bytes):
    
            for lin in self.grid[self.gridNodeIndexes[nodepublickey]].adjacentLines:  # for lines connected to part/trafo
    
                async with httpx.AsyncClient() as client:
    
                    response_edge = await client.get("http://" + lin.publicIP.__str__() + ":8000/asRemoteJSON")
                    logger.info("===> getting line from: " + lin.publicIP.__str__())
                    rlin: RemoteLine = jsonpickle.decode(response_edge.json())
    
    Johannes Hitzinger's avatar
    Johannes Hitzinger committed
                    for con in rlin.adjacentConnections:
    
                        if con.publicKey not in self.gridNodeIndexes.keys():
    
                            response_node = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteJSON")
                            logger.info("===> getting con from: " + con.publicIP.__str__())
                            rcon = jsonpickle.decode(response_node.json())
                            i_node = self.grid.add_node(rcon)
                            logger.info("===> created node for " + rcon.publicIP.__str__())
    
                            self.gridNodeIndexes[rcon.publicKey] = i_node
                            i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey], i_node, rlin)
    
                            self.gridEdgeIndexes[rlin.publicKey].add(i_edge)
    
                            await self.discoverFromNode(rcon.publicKey)
                        else:
                            logger.info("===> already known node: " + con.publicIP.__str__())
    
                            if rlin.publicKey not in self.gridEdgeIndexes.keys() and nodepublickey != con.publicKey:
                                i_edge = self.grid.add_edge(self.gridNodeIndexes[nodepublickey],
                                                            self.gridNodeIndexes[con.publicKey], rlin)
                                self.gridEdgeIndexes[rlin.publicKey].add(i_edge)
    
                                logger.info("===> created edge")
    
        async def getGridPNG(self):
            plt.clf()
            img = mpl_draw(self.grid, with_labels=True, edge_labels=str, labels=str)
            buf = BytesIO()
            img.savefig(buf, format='png')
            buf.seek(0)
            img_base64 = base64.b64encode(buf.read()).decode('utf-8')
            html_content = f"""
                <html>
                    <body>
                        <img src="data:image/png;base64,{img_base64}" />
                    </body>
                </html>
                """
            return HTMLResponse(content=html_content)
    
        async def produceNextTrade(self):
    
    
            # here we can take into account if there is a variable consumer in the house (e.g. electric car or heat pump)
            # and if a battery is available to take care of the fluctuations or to even supply energy to the grid
            # when the price is right
    
            # this will currently only produce an exchange for the next minute, any market actions of creating future
            # exchanges are not implemented and not in scope of this thesis
    
            # set projected energy to current energy, a better forecast would be nice
            self.projectedInHouseSupply = self.currentInHouseSupply
            self.__projectedInHouseDemand = self.__currentInHouseDemand
    
            power = self.projectedInHouseSupply - self.__projectedInHouseDemand
            ex_next_min = False
    
            logger.info("===> checking if new exchange has to be created on " + self.publicIP.__str__())
    
            for t in self.ExchangeQueue:  # only produce a new exchange if there is no exchange for the next minute
                if t.is_min_next():
                    ex_next_min = True
    
            if power > Q_(0, ureg.watt) and not ex_next_min:
                # more energy will be available than needed, so we can create an offer
    
                # for now all offers will be at the same price, in a real world scenario this would be dynamic
                # additionally, all offers will be for the next minute, when the system is able to take variable
                # consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request
                # bigger amounts of energy in a foreseeable fashion
                o = Offer(self.publicKey, power, 0.3, next_minute())  # price should be dynamic, no time :D
    
                self.ExchangeQueue.append(o)
                logger.info("===> created offer on " + self.publicIP.__str__())
            else:
                if power < Q_(0, ureg.watt) and not ex_next_min:
                    # more energy is needed than available, so we create a request
    
                    # for now all requests will be at the same price, in a real world scenario this would be dynamic
                    # additionally, all requests will be for the next minute, when the system is able to take variable
                    # consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request
                    # bigger amounts of energy in a foreseeable fashion
                    r = Request(self.publicKey, power, 0.3, next_minute())
    
                    self.ExchangeQueue.append(r)
                    logger.info("===> created request on " + self.publicIP.__str__())
    
    
        async def findRoute(self, start: RemoteParticipant, end: RemoteParticipant) -> Route:
            s = self.gridNodeIndexes[start.publicKey]
            e = self.gridNodeIndexes[end.publicKey]
            path = rx.dijkstra_shortest_paths(self.grid, s, e, lambda l: l.loss)
            routeloss = rx.dijkstra_shortest_path_lengths(self.grid, s, lambda l: l.loss, e)
            graph = self.grid.subgraph(path[e])
            return Route(start, end, graph, routeloss)
    
        async def announceRoute(self, t: Trade):
            class GridVisitor(DijkstraVisitor):
                def __init__(self, trade: Trade):
                    super().__init__()
                    self.examined_edges = set()
                    self.trade: Trade = trade
    
                def discover_vertex(self, vertex, distance):
                    logger.info(f"Discovered Trafo: {vertex}")
    
    
                def examine_edge(self, edge):
                    for e in self.examined_edges:
                        if edge[0] == e[1] and edge[1] == e[0]:
                            logger.info(f"Line already examined: {edge}")
                            return
    
                    logger.info(f"Examining Line: {edge}")
                    self.examined_edges.add(edge)
    
            visitor = GridVisitor(t)
            # walk the route and announce the route to all grid elements involved
            rx.dijkstra_search(t.route.graph, [self.gridNodeIndexes[t.route.start.publicKey]], lambda _: 1.0, visitor)
    
    
        async def findOffer(self, discarded_rps=frozenset()):
            closest = self.as_remote_participant()  # this is just to have a rp to compare to
            for rp in self.remoteParticipants:
                if rp.distance < closest.distance and rp not in discarded_rps:
    
                    logger.info("===> found closer participant: " + rp.publicIP.__str__())
    
                    closest = rp
    
            if closest != self.as_remote_participant():  # check if there is actually someone
                async with httpx.AsyncClient() as client:
                    response = await client.get("http://" + closest.publicIP.__str__() + ":8000/getOffersJSON")
    
                    offers: list[Offer] = jsonpickle.decode(response.json())
    
                        o.set_ureg(ureg)  # update the ureg of the offer for pint to work
    
                        if o.is_min_next():  # once again, only trying to find an offer for the next minute
    
                            if o.power >= self.ExchangeQueue[0].power:
                                # the offer is big enough to fulfill the request
                                # now find the route to the offering participant
                                r = await self.findRoute(self.as_remote_participant(), closest)
                                logger.info("===> found route to " + closest.publicIP.__str__())
                                # create a trade, include offer and request, use power from request since we ensured that
                                # the offer is big enough, use price from offer as this is the price that will be paid
                                # and again, only trades for the next minute are possible at this point
                                t = Trade(o, self.ExchangeQueue[0], self.ExchangeQueue[0].power, o.pricePerWatt, r,
                                          next_minute())
                                t.set_ureg(ureg)
                                # now announce the route to all connections involved
                                await self.announceRoute(t)
    
                            logger.info("===> found offer on " + closest.publicIP.__str__())
    
            else:
                logger.info("===> no one to trade with on " + self.publicIP.__str__())
                return
                # when no partner can be found, this will be tried again after the next updateCurrentPower
                # when a request is about to "time out" because no partner has been found, the required power of
                # the participant should be reduced to avoid a blackout. in a fully digitalized grid the participant
                # would be able to request power from the grid
                # in an overlapping period the participant simply takes the power from the grid via the old counter
    
            self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand
            # simulate some random fluctuations
    
            self.__currentInHouseDemand = self.__currentInHouseDemand + Q_(uniform(-0.1, 0.1), ureg.watt)
    
            t = Q_(5, ureg.second)  # this is run every 5 seconds
    
            t.ito(ureg.hour)
            self.__energyOffset += self.__availablePower * t
            logger.info(">>>> available power: " + self.__availablePower.__str__() + " and energy offset " +
                        self.__energyOffset.__str__() + " on " + self.publicIP.__str__())
            await self.produceNextTrade()
    
            if len(self.ExchangeQueue) > 0:
                if isinstance(self.ExchangeQueue[0], Request):  # see if there is a request for power in front of the queue
                    logger.info("===> trying to find an offer on " + self.publicIP.__str__())
                    await self.findOffer()
    
    
    
    class Weather:
        # Weather might be better done using this but time is running out
        # https://pvlib-python.readthedocs.io/en/v0.6.0/forecasts.html
        def __init__(self, p: Participant, cc: float, sh: float, schd: AsyncIOScheduler):
            self.cloudCoverage = cc
            self.sunshine = sh
            self.participant = p
            self.scheduler = schd
    
    
            run_date = datetime.now() + timedelta(hours=1, seconds=1)  # +1 hour because timezones suck hard
    
            self.scheduler.add_job(self.setCurrentWeather, 'date', run_date=run_date, id='100', )
    
            self.scheduler.add_job(self.changeWeather, 'interval', seconds=300, id='101', name='changeWeather')
            self.scheduler.add_job(self.setCurrentWeather, 'interval', seconds=10, id='102', name='setCurrentWeather')
    
    
        async def changeWeather(self):
            if self.cloudCoverage > 0.7:
    
                self.cloudCoverage -= uniform(0.0, 0.1)
    
                    self.cloudCoverage += uniform(0.0, 0.1)
    
                    self.cloudCoverage += uniform(-0.2, 0.2)
    
            self.participant.currentInHouseSupply = Q_(max(0.0, (self.sunshine + uniform(-0.05, 0.05) - self.cloudCoverage +
                                                                 uniform(-0.05, 0.05))) * self.participant.solarWattage,
                                                       ureg.watt)
    
            logger.info(">>>> setting current weather on " + self.participant.publicIP.__str__() + "with sunshine" +
    
                        self.sunshine.__str__() + "and cloud coverage " + self.cloudCoverage.__str__() + "producing " +
                        self.participant.currentInHouseSupply.__str__() + " watt")
    
    
    if __name__ == "__main__":
    
        parser = argparse.ArgumentParser(description='Participant in the Grid')
        parser.add_argument('--con', type=str, required=True, help='the cable that the participant is connected to')
    
        parser.add_argument('--cc', type=float, required=True, help='initial cloud coverage')
        parser.add_argument('--sh', type=float, required=True, help='initial sunshine')
        parser.add_argument('--dmd', type=float, required=True, help='initial demand')
        parser.add_argument('--solar', type=int, required=True, help='solar wattage')
    
        args = parser.parse_args()
    
    
        jobstores = {
            'default': MemoryJobStore()
        }
        scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
        logging.basicConfig(
            level=logging.INFO,
            handlers=[
                logging.StreamHandler()
            ]
        )
    
    
        @asynccontextmanager
        async def lifespan(app: FastAPI):
            scheduler.start()
            yield
            scheduler.shutdown()
    
    
        fast_app = FastAPI(lifespan=lifespan)
    
        part = Participant(nid="test_network", dht_ep={ipaddress.IPv4Address('172.20.0.2')}, scd=scheduler,
    
                           init_demand=args.dmd, solar_wattage=args.solar, con_ip=ipaddress.IPv4Address(args.con))
    
        fast_app.include_router(part.fastRouter)
    
        w = Weather(part, args.cc, args.sh, scheduler)
    
        uvicorn.run(fast_app, host=get_pub_ip(), port=8000)