diff --git a/exchange.py b/exchange.py index 6a12fa4a8c065fb5ad122ebffd3914a252c7a2d6..46bf864f95d6c896624b36791efabe86f916d8bb 100644 --- a/exchange.py +++ b/exchange.py @@ -1,13 +1,16 @@ +import ipaddress + from pint import UnitRegistry import time from datetime import datetime, timedelta class Exchange: - def __init__(self, pk: bytes, p, ppw: float, extime: datetime): + def __init__(self, pk: bytes, p, ppw: float, extime: datetime, ip: ipaddress.IPv4Address): self.ureg = None self.Q_ = None self.publicKey: bytes = pk + self.publicIP: ipaddress.IPv4Address = ip self.timestamp: int = time.time_ns() self.executiontime: datetime = extime self.__isActive: bool = False @@ -68,5 +71,5 @@ class Exchange: return self.executiontime.timestamp() < int(now.timestamp()) def __repr__(self): - return self.pubicKey.__repr__() + " " + str(self.power) + " " + str(self.pricePerWatt) + " " + str( + return self.publicKey.__repr__() + " " + str(self.power) + " " + str(self.pricePerWatt) + " " + str( self.executiontime) \ No newline at end of file diff --git a/main.py b/main.py index 98143fbdbc1c83d9280d6a7577064b6b49afe9ca..2244122daebffe9fab8530e0af1223aad85045de 100644 --- a/main.py +++ b/main.py @@ -161,6 +161,7 @@ result = d1.verify(pk, msg, sig) #print(result) ''' +''' from trade import Trade from offer import Offer from request import Request @@ -173,11 +174,12 @@ rp1 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [o1], set( rp2 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [r1], set()) rou = Route(rp1, rp2, rx.PyGraph(), 0.1) -t1 = Trade(o1, r1, 0.1, 0.1, rou, datetime.datetime.now()) +t1 = Trade(o1, r1, 0.1, 0.1, rou, datetime.datetime.now(), time.time_ns()) j = jsonpickle.encode(t1) t2 = jsonpickle.decode(j) print(t2) +''' jobstores = { 'default': MemoryJobStore() diff --git a/offer.py b/offer.py index 383a825936e9afab334ca01da53d1bf291c7c223..b10bbc0f2d5286189714af6ca23c845df6bbb250 100644 --- a/offer.py +++ b/offer.py @@ -1,8 +1,9 @@ import exchange from datetime import datetime +import ipaddress class Offer(exchange.Exchange): - def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime): - exchange.Exchange.__init__(self, pk, p, ppw, extime) + def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime, ip: ipaddress.IPv4Address): + exchange.Exchange.__init__(self, pk, p, ppw, extime, ip) diff --git a/participant.py b/participant.py index 276d25cb1370ec3e862b325d87e664c8b7fa152b..9420d6edb96ce5073ec7d36f9b30494fd078d223 100644 --- a/participant.py +++ b/participant.py @@ -41,7 +41,7 @@ from remotetransformer import RemoteTransformer, SignedRemoteTransformer from remoteparticipant import RemoteParticipant -from fastapi import APIRouter, FastAPI, Body +from fastapi import APIRouter, FastAPI, Body, HTTPException import logging logger = logging.getLogger(__name__) @@ -93,6 +93,7 @@ class Participant: self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"]) self.fastRouter.add_api_route("/announceTrade", self.announceTrade, methods=["POST"]) self.fastRouter.add_api_route("/getTradeToSign", self.getTradeToSign, methods=["GET"]) + self.fastRouter.add_api_route("/signTrade", self.signTrade, methods=["POST"]) # announce self on dht asyncio.run(self.dht_startup()) # self.loop.run_in_executor(None, self.dht_startup) @@ -107,8 +108,9 @@ class Participant: asyncio.run(self.discoverGrid()) # generate and add GenesisTrade gt = GenesisTrade(self.publicKey, self.as_remote_participant()) - gt.sig_off = self.dil.sign_with_input(self.__secretKey, gt.offer.__repr__().encode()) - gt.sig_req = self.dil.sign_with_input(self.__secretKey, gt.request.__repr__().encode()) + with redirect_stdout(None): + gt.sig_off = self.dil.sign_with_input(self.__secretKey, gt.offer.__repr__().encode()) + gt.sig_req = self.dil.sign_with_input(self.__secretKey, gt.request.__repr__().encode()) self.__tradeHistory.append(gt) # set up scheduling for tasks @@ -300,6 +302,48 @@ class Participant: return jsonpickle.encode(self.__tradeHistory[-1]) # return the last trade in the history, this should only # happen when the participant is new and has no trades beside the GenesisTrade + async def signTrade(self, body=Body()): + trade: Trade = jsonpickle.decode(body["trade"]) + public_key = jsonpickle.decode(body["public_key"]) + signature = jsonpickle.decode(body["signature"]) + + # find the matching trade and add the signature to the sig_chain + logger.info("===> matching trade and adding signature") + for t in self.availableTrades: + if t == trade: + if len(t.sig_chain[0]) == 0: + t.sig_chain[0].append((public_key, signature)) + return jsonpickle.encode(t) + if len(t.sig_chain[1]) == 0 and t.sig_chain[0][0][0] != public_key: # not adding the same signature twice + t.sig_chain[1].append((public_key, signature)) + return jsonpickle.encode(t) + for t in self.TradeQueue: + if t == trade: + if len(t.sig_chain[0]) == 0: + t.sig_chain[0].append((public_key, signature)) + return jsonpickle.encode(t) + if len(t.sig_chain[1]) == 0 and t.sig_chain[0][0][0] != public_key: + t.sig_chain[1].append((public_key, signature)) + return jsonpickle.encode(t) + for t in self.activeTrades: + if t == trade: + if len(t.sig_chain[0]) == 0: + t.sig_chain[0].append((public_key, signature)) + return jsonpickle.encode(t) + if len(t.sig_chain[1]) == 0 and t.sig_chain[0][0][0] != public_key: + t.sig_chain[1].append((public_key, signature)) + return jsonpickle.encode(t) + for t in self.__tradeHistory: + if t == trade: + if len(t.sig_chain[0]) == 0: + t.sig_chain[0].append((public_key, signature)) + return jsonpickle.encode(t) + if len(t.sig_chain[1]) == 0 and t.sig_chain[0][0][0] != public_key: + t.sig_chain[1].append((public_key, signature)) + return jsonpickle.encode(t) + logger.info("===> no suitable trade found") + raise HTTPException(status_code=404, detail="Trade not found") + async def announceTrade(self, body=Body()): # here a Trade will be announced from another participant which # tries to match their offer with the request of this participant @@ -314,9 +358,17 @@ class Participant: if not self.dil.verify(pk, i.line.__repr__().encode(), i.signature): logger.info("===> Signature not valid") return "Error: Signature not valid" + if isinstance(i, SignedRemoteTransformer): + response = await client.get("http://" + i.transformer.publicIP.__str__() + ":8000/getPublicKey") + pk = jsonpickle.decode(response.json()) + # logger.info("--------------->" + i.trafo.__repr__().encode().__str__()) + if not self.dil.verify(pk, i.transformer.__repr__().encode(), i.signature): + logger.info("===> Signature not valid") + return "Error: Signature not valid" with redirect_stdout(None): trade.sig_offer = self.dil.sign_with_input(self.__secretKey, trade.offer.__repr__().encode()) logger.info("===> confirmed trade") + self.availableTrades.append(trade) return jsonpickle.encode(trade) async def produceNextTrade(self): @@ -352,7 +404,7 @@ class Participant: # additionally, all offers will be for the next minute, when the system is able to take variable # consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request # bigger amounts of energy in a foreseeable fashion - o = Offer(self.publicKey, power, 0.3, next_minute()) # price should be dynamic, no time :D + o = Offer(self.publicKey, power, 0.3, next_minute(), self.publicIP) # price should be dynamic, no time :D self.ExchangeQueue.append(o) logger.info("===> created offer on " + self.publicIP.__str__()) else: @@ -362,7 +414,7 @@ class Participant: # additionally, all requests will be for the next minute, when the system is able to take variable # consumers (e.g. electric cars and heat pumps) into account, this can be adapted to request # bigger amounts of energy in a foreseeable fashion - r = Request(self.publicKey, power, 0.3, next_minute()) + r = Request(self.publicKey, power, 0.3, next_minute(), self.publicIP) self.ExchangeQueue.append(r) logger.info("===> created request on " + self.publicIP.__str__()) @@ -445,7 +497,7 @@ class Participant: # the offer is big enough, use price from offer as this is the price that will be paid # and again, only trades for the next minute are possible at this point t = Trade(o, self.ExchangeQueue[0], self.ExchangeQueue[0].power, o.pricePerWatt, r, - next_minute()) + next_minute(), time.time_ns()) # now announce the route to all connections involved signedt = await self.announceRoute(t) # sign the trade to mark it as valid @@ -468,29 +520,66 @@ class Participant: if confirmedt.request == self.ExchangeQueue[0]: # own request is fulfilled by trade self.availableTrades.append(confirmedt) self.ExchangeQueue.pop(0) - - - - - + signable_trade1: Trade = await self.findConfirmableTrade() + # verify the route of the trade + for i in signable_trade1.singedRoute: + if isinstance(i, SignedRemoteLine): + response = await client.get( + "http://" + i.line.publicIP.__str__() + ":8000/getPublicKey") + pk = jsonpickle.decode(response.json()) + # logger.info("--------------->" + i.line.__repr__().encode().__str__()) + if not self.dil.verify(pk, i.line.__repr__().encode(), i.signature): + logger.info("===> Signature not valid") + return "Error: Signature not valid" + if isinstance(i, SignedRemoteTransformer): + response = await client.get( + "http://" + i.transformer.publicIP.__str__() + ":8000/getPublicKey") + pk = jsonpickle.decode(response.json()) + # logger.info("--------------->" + i.trafo.__repr__().encode().__str__()) + if not self.dil.verify(pk, i.transformer.__repr__().encode(), i.signature): + logger.info("===> Signature not valid") + return "Error: Signature not valid" + + # route seems to be fine, now we can sign + with redirect_stdout(None): + sig = self.dil.sign_with_input(self.__secretKey, + signable_trade1.__repr__().encode()) + response = await client.post( + "http://" + signable_trade1.offer.publicIP.__str__() + ":8000/signTrade", + json={"trade": jsonpickle.encode(signable_trade1), + "public_key": jsonpickle.encode(self.publicKey), + "signature": jsonpickle.encode(sig)}) + if response.status_code == 200: + logger.info("===> received confirmation from " + signable_trade1.offer.publicIP.__str__()) + signed_trade1 = jsonpickle.decode(response.json()) + confirmedt.confirmed_trades.append(signed_trade1) + if response.status_code == 404: + logger.info("===> no trade returned from " + signable_trade1.offer.publicIP.__str__()) + + logger.info("-------------------------------------") + + # confirmedt.confirmed_trades.append() else: logger.info("===> no one to trade with on " + self.publicIP.__str__()) return - # when no partner can be found, this will be tried again after the next updateCurrentPower - # when a request is about to "time out" because no partner has been found, the required power of - # the participant should be reduced to avoid a blackout. in a fully digitalized grid the participant - # would be able to request power from the grid - # in an overlapping period the participant simply takes the power from the grid via the old counter - - async def findConfirmableTrade(self) -> Trade: - possibleTrade = None + # when no partner can be found, this will be tried again after the next updateCurrentPower + # when a request is about to "time out" because no partner has been found, the required power of + # the participant should be reduced to avoid a blackout. in a fully digitalized grid the participant + # would be able to request power from the grid operator at a high cost + # in an overlapping period the participant simply takes the power from the grid via the old counter + + async def findConfirmableTrade(self): + possible_trade = None for rp in self.remoteParticipants: async with httpx.AsyncClient() as client: response = await client.get("http://" + rp.publicIP.__str__() + ":8000/getTradeToSign") - possibleTrade = jsonpickle.decode(response.json()) - # TODO: continue here - + trade = jsonpickle.decode(response.json()) + if isinstance(trade, Trade): # taking the first actual trade + return trade + if isinstance(trade, GenesisTrade): # if all thats available are genesistrades, we take one + possible_trade = trade + return possible_trade async def updateCurrentPower(self): self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand @@ -507,10 +596,6 @@ class Participant: logger.info("===> trying to find an offer on " + self.publicIP.__str__()) await self.findOffer() - if len(self.availableTrades) > 0: - conftrade: Trade = await self.findConfirmableTrade() - - class Weather: # Weather might be better done using this but time is running out diff --git a/request.py b/request.py index 61fae780108446b024bd34844a3c5c2c8fdcabe5..69187b1ce55d4fa66a169e6b53cfa57c9a694b7c 100644 --- a/request.py +++ b/request.py @@ -1,10 +1,12 @@ +import ipaddress + import exchange from datetime import datetime class Request(exchange.Exchange): - def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime): - exchange.Exchange.__init__(self, pk, p, ppw, extime) + def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime, ip: ipaddress.IPv4Address): + exchange.Exchange.__init__(self, pk, p, ppw, extime, ip) def __eq__(self, other): return self.publicKey == other.publicKey and self.executiontime == other.executiontime diff --git a/trade.py b/trade.py index e504aabf04ef25bee665514616e776f0479a0425..51cb315d845f6054784923cda0c28aa031101948 100644 --- a/trade.py +++ b/trade.py @@ -12,10 +12,10 @@ from remoteparticipant import RemoteParticipant class Trade: def __init__(self, off: offer.Offer, req: request.Request, pwr, ppw: float, rou: route.Route, - extime: datetime.datetime): + extime: datetime.datetime, ts: int): self.ureg = None self.Q_ = None - self.__timestamp = time.time_ns() + self.__timestamp = ts self.offer: offer.Offer = off self.request: request.Request = req self.route = rou @@ -39,7 +39,10 @@ class Trade: def __repr__(self): return (self.offer.__repr__() + " " + self.request.__repr__() + " " + str(self.power) + " " + - str(self.__pricePerWatt) + " " + str(self.__executionTime)) + str(self.__pricePerWatt) + " " + str(self.__executionTime) + " " + str(self.__timestamp)) + + def __eq__(self, other): + return self.offer.publicKey == other.offer.publicKey and self.request.publicKey == other.request.publicKey and self.__timestamp == other.__timestamp def remove_ureg(self): self.power = float(self.power.magnitude) @@ -54,7 +57,7 @@ class GenesisTrade(Trade): def __init__(self, public_key: bytes, remote_me: RemoteParticipant): g = rustworkx.PyGraph() g.add_node(remote_me) - super().__init__(offer.Offer(public_key, 0, 0, datetime.datetime.now()), - request.Request(public_key, 0, 0, datetime.datetime.now()), 0, 0, + super().__init__(offer.Offer(public_key, 0, 0, datetime.datetime.now(), remote_me.publicIP), + request.Request(public_key, 0, 0, datetime.datetime.now(), remote_me.publicIP), 0, 0, route.Route(remote_me, remote_me, g, 0), - datetime.datetime.now()) + datetime.datetime.now(), time.time_ns())