diff --git a/connection.py b/connection.py index 37f8a61e0142b343bccbbb615f3b2f786311c21d..34916ebcd22f4d5eb6d81e375bac98416a8c6e1f 100644 --- a/connection.py +++ b/connection.py @@ -22,7 +22,6 @@ from util import get_pub_ip logger = logging.getLogger(__name__) - # TODO: is Connection (cable or transformer) involved in confirming trades? can this be be prevented? class Connection: @@ -39,8 +38,13 @@ class Connection: self.usedCapacity = 0 self.loss = 0.01 # this is a wrong assumption, actual line loss is depending on the load + self.fastRouter.add_api_route("/getPublicKey", self.getPublicKey, methods=["GET"]) + def checkCapacity(self, rc: RemoteConnection): if self.availableCapacity - self.usedCapacity > rc.tradeCapacity: self.usedCapacity += rc.tradeCapacity + rc.loss return True return False + + async def getPublicKey(self): + return jsonpickle.encode(self.publicKey) \ No newline at end of file diff --git a/exchange.py b/exchange.py index 58b747e9ca0849cccd7f3323480598ed5053cc44..6a12fa4a8c065fb5ad122ebffd3914a252c7a2d6 100644 --- a/exchange.py +++ b/exchange.py @@ -4,22 +4,26 @@ from datetime import datetime, timedelta class Exchange: - def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime): + def __init__(self, pk: bytes, p, ppw: float, extime: datetime): self.ureg = None self.Q_ = None - self.pubicKey: bytes = pk + self.publicKey: bytes = pk self.timestamp: int = time.time_ns() self.executiontime: datetime = extime self.__isActive: bool = False self.power = p self.pricePerWatt = ppw # this is in EUR, no new cryptocurrency should be created for this - self.__sig: bytes def set_ureg(self, ureg: UnitRegistry) -> None: self.ureg = ureg self.Q_ = ureg.Quantity self.power = self.Q_(self.power, ureg.watt) + def remove_ureg(self): + self.power = float(self.power.magnitude) + self.ureg = None + self.Q_ = None + def activate(self) -> None: self.__isActive = True @@ -62,3 +66,7 @@ class Exchange: def is_old(self) -> bool: now = datetime.now() return self.executiontime.timestamp() < int(now.timestamp()) + + def __repr__(self): + return self.pubicKey.__repr__() + " " + str(self.power) + " " + str(self.pricePerWatt) + " " + str( + self.executiontime) \ No newline at end of file diff --git a/line.py b/line.py index 2f45ee6b8ea46e2f70924447bd53bb209726bb63..a404c0ddf661503ca3862576eec563ce5901ad13 100644 --- a/line.py +++ b/line.py @@ -1,12 +1,13 @@ import datetime import ipaddress +import time import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler import argparse import asyncio -from contextlib import asynccontextmanager -from fastapi import APIRouter, FastAPI +from contextlib import asynccontextmanager, redirect_stdout +from fastapi import APIRouter, FastAPI, Body import jsonpickle import uvicorn from apscheduler.jobstores.memory import MemoryJobStore @@ -25,10 +26,10 @@ class Line(Connection): def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): super().__init__(cap, conips, sched) self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set() - self.lastSignedLine: dict[datetime.datetime, SignedRemoteLine] = dict() + self.lastSignedLine: dict[float, SignedRemoteLine] = dict() # adding fastapi endpoints self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"]) - self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"]) + self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"]) # setting up scheduling run_date = datetime.datetime.now() + datetime.timedelta(hours=1, seconds=5) # +1 hour because timezones suck hard @@ -49,28 +50,33 @@ class Line(Connection): self.usedCapacity, self.adjacentConnections, self.loss) return jsonpickle.encode(rl) - async def sign(self, time: datetime.datetime, rljson): - print(rljson) - rl = jsonpickle.decode(rljson) + async def sign(self, extime: float, body=Body()): + rl = jsonpickle.decode(body["remote"]) + logger.info("===> Line: " + self.publicIP.__str__() + " received sign request for " + rl.publicIP.__str__()) if rl.publicKey == self.publicKey: # check if the rl actually is me if self.checkCapacity(rl): - print(self.secretKey) - time = time.replace(second=0, microsecond=0) # we are scheduling in minute intervals - if time not in self.lastSignedLine.keys(): + if extime not in self.lastSignedLine.keys(): origin = SignedRemoteLine( RemoteLine(self.publicIP, self.publicKey, self.availableCapacity, self.usedCapacity, self.adjacentConnections, self.loss), None) origin.isOrigin = True - origin.signature = self.dil.sign_with_input(self.secretKey, origin.__str__().encode()) + with redirect_stdout(None): + origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode()) result = SignedRemoteLine(rl, origin) - result.signature = self.dil.sign_with_input(self.secretKey, result.__str__().encode()) - self.lastSignedLine[time] = result + with redirect_stdout(None): + result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode()) + + # logger.info("------------------->" + rl.__repr__().encode().__str__()) + self.lastSignedLine[extime] = result + logger.info("===> Line: " + self.publicIP.__str__() + " signed first " + rl.publicIP.__str__()) return jsonpickle.encode(result) else: # if there has been a route announced before, add the previous to the new one - result = SignedRemoteLine(rl, self.lastSignedLine[time]) - result.signature = self.dil.sign_with_input(self.secretKey, result.__str__().encode()) - self.lastSignedLine[time] = result + result = SignedRemoteLine(rl, self.lastSignedLine[extime]) + with redirect_stdout(None): + result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode()) + self.lastSignedLine[extime] = result + logger.info("===> Line: " + self.publicIP.__str__() + " signed " + rl.publicIP.__str__()) return jsonpickle.encode(result) else: # TODO: Rest Error handling return "Not enough capacity" # better handling here would be nice diff --git a/main.py b/main.py index 36472f86777624001976c150bde85b846df8b77c..98143fbdbc1c83d9280d6a7577064b6b49afe9ca 100644 --- a/main.py +++ b/main.py @@ -11,7 +11,7 @@ import jsonpickle import numpy import uvicorn import logging -from fastapi import FastAPI, Path, APIRouter +from fastapi import FastAPI, Path, APIRouter, Body from contextlib import asynccontextmanager from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.memory import MemoryJobStore @@ -54,6 +54,7 @@ def get_ip(): return IP +''' class CustomDijkstraVisitor(DijkstraVisitor): def __init__(self): super().__init__() @@ -62,7 +63,6 @@ class CustomDijkstraVisitor(DijkstraVisitor): def discover_vertex(self, vertex, distance): print(f"Discovered vertex: {vertex} with distance: {distance}") - def examine_edge(self, edge): for e in self.examined_edges: if edge[0] == e[1] and edge[1] == e[0]: @@ -83,6 +83,7 @@ visitor = CustomDijkstraVisitor() # Perform Dijkstra's search rx.dijkstra_search(graph, [0], lambda _: 1.0, visitor) +''' ''' class GraphNode: @@ -148,7 +149,7 @@ e1 = p1 * t1 #e1.ito(ureg.kilowatt_hour) print(e1) ''' - +''' d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max), @@ -156,8 +157,27 @@ pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint msg = b"this is a message" sig = d1.sign_with_input(sk, msg) result = d1.verify(pk, msg, sig) -print(sk) -print(result) +#print(sk) +#print(result) +''' + +from trade import Trade +from offer import Offer +from request import Request +from route import Route +from remoteparticipant import RemoteParticipant + +o1 = Offer(bytes(123), 0.1, 0.1, datetime.datetime.now()) +r1 = Request(bytes(123), 0.1, 0.1, datetime.datetime.now()) +rp1 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [o1], set()) +rp2 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [r1], set()) +rou = Route(rp1, rp2, rx.PyGraph(), 0.1) + +t1 = Trade(o1, r1, 0.1, 0.1, rou, datetime.datetime.now()) + +j = jsonpickle.encode(t1) +t2 = jsonpickle.decode(j) +print(t2) jobstores = { 'default': MemoryJobStore() @@ -181,21 +201,30 @@ async def lifespan(app: FastAPI): fast_app = FastAPI(lifespan=lifespan) -# start the server +async def test(t=Body()): + print(t) + return t + + +fast_app.add_api_route("/test", test, methods=["POST"]) + +# start the server +''' if __name__ == "__main__": @scheduler.scheduled_job('interval', seconds=100) def scheduled_job_1(): logger.info("hello from " + get_ip()) + print(datetime.datetime.now()) l = line.Line(cap=100, conips={ipaddress.IPv4Address('0.0.0.0')}, sched=scheduler) rl = asyncio.run(l.asRemoteLineJSON()) print(rl) - #part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address( + # part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address( # '172.20.0.2')}, con_ip=ipaddress.IPv4Address('172.20.1.1')) # Network_ID should be a Hash for real world use, provide ip adresses of dht endpoints for first discovery - #scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants') + # scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants') fast_app.include_router(l.fastRouter) uvicorn.run(fast_app, host=get_ip(), port=8000) - +''' diff --git a/offer.py b/offer.py index 5fc8f302935204a1dd7d8318dfda8e91ba3a3609..383a825936e9afab334ca01da53d1bf291c7c223 100644 --- a/offer.py +++ b/offer.py @@ -5,3 +5,4 @@ from datetime import datetime class Offer(exchange.Exchange): def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime): exchange.Exchange.__init__(self, pk, p, ppw, extime) + diff --git a/participant.py b/participant.py index 07352a9b7f6d628b1e679e30cfb57fbc393027bd..276d25cb1370ec3e862b325d87e664c8b7fa152b 100644 --- a/participant.py +++ b/participant.py @@ -2,7 +2,7 @@ from pint import UnitRegistry, set_application_registry import locale import time -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, redirect_stdout from datetime import datetime, timedelta from collections import defaultdict @@ -34,14 +34,14 @@ from route import Route from offer import Offer from request import Request from exchange import Exchange -from trade import Trade +from trade import Trade, GenesisTrade from remoteconnection import RemoteConnection -from remoteline import RemoteLine -from remotetransformer import RemoteTransformer +from remoteline import RemoteLine, SignedRemoteLine +from remotetransformer import RemoteTransformer, SignedRemoteTransformer from remoteparticipant import RemoteParticipant -from fastapi import APIRouter, FastAPI +from fastapi import APIRouter, FastAPI, Body import logging logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ class Participant: def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address, scd: AsyncIOScheduler, init_demand: float, solar_wattage: int, ip: ipaddress = get_pub_ip()): self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"]) - self.__secretKey, self.publicKey = self.dil.keygen( + self.publicKey, self.__secretKey = self.dil.keygen( [randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)]) self.fastRouter = APIRouter() @@ -69,10 +69,13 @@ class Participant: self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set) self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants self.remoteParticipants: set[RemoteParticipant] = set() - self.ExchangeQueue: list[Exchange] = list() # own exchanges for future turns - self.activeTrades: set[Trade] = set() # own active trades for this turn - self.__tradeHistory: list[Trade] = [] # every own past trade - self.solarWattage = solar_wattage = solar_wattage + self.ExchangeQueue: list[Exchange] = list() # own available exchanges for future turns + self.availableTrades: list[Trade] = list() # own available trades for future turns waiting for confirmation + self.TradeQueue: list[Trade] = list() # fully confirmed trades waiting to be enacted + self.activeTrades: list[Trade] = list() # own active trades for this turn + self.__tradeHistory: list[Trade] = list() # every own past trade + self.__accountBalance: float = 0.0 + self.solarWattage = solar_wattage self.__availablePower = Q_(0, ureg.watt) # possible power exchange with the grid self.__energyOffset = Q_(0, ureg.watt * ureg.hour) # energy offset for the next round self.__currentInHouseDemand = Q_(init_demand, ureg.watt) # real time demand from household appliances, storage @@ -88,6 +91,8 @@ class Participant: self.fastRouter.add_api_route("/getOffersJSON", self.getOffersJSON, methods=["GET"]) self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"]) self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"]) + self.fastRouter.add_api_route("/announceTrade", self.announceTrade, methods=["POST"]) + self.fastRouter.add_api_route("/getTradeToSign", self.getTradeToSign, methods=["GET"]) # announce self on dht asyncio.run(self.dht_startup()) # self.loop.run_in_executor(None, self.dht_startup) @@ -100,6 +105,12 @@ class Participant: time.sleep(10) logger.info("~~~> starting discoverGrid") # participants will be added after each request_remote_participants asyncio.run(self.discoverGrid()) + # generate and add GenesisTrade + gt = GenesisTrade(self.publicKey, self.as_remote_participant()) + gt.sig_off = self.dil.sign_with_input(self.__secretKey, gt.offer.__repr__().encode()) + gt.sig_req = self.dil.sign_with_input(self.__secretKey, gt.request.__repr__().encode()) + self.__tradeHistory.append(gt) + # set up scheduling for tasks self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1', name='requestRemoteParticipants') @@ -207,6 +218,8 @@ class Participant: url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON' async with httpx.AsyncClient() as client: response = await client.get(url) + # this works the first time round but starts throwing errors after, probably because of + # timing issues, only appears when this task runs during route announcement, not critical remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json()) # some validation of remoteparticipants should happen, not in scope of this thesis all_remoteparticipants.update(remoteparticipants) @@ -277,6 +290,35 @@ class Participant: """ return HTMLResponse(content=html_content) + async def getTradeToSign(self): + if len(self.availableTrades) > 0: # prefer trades that actually need confirmation + return jsonpickle.encode(self.availableTrades[0]) + if len(self.TradeQueue) > 0: # allow already confirmed trades to be signed + return jsonpickle.encode(self.TradeQueue[-1]) + if len(self.activeTrades) > 0: # allow active trade to be signed again (this should be avoided) + return jsonpickle.encode(self.activeTrades[-1]) + return jsonpickle.encode(self.__tradeHistory[-1]) # return the last trade in the history, this should only + # happen when the participant is new and has no trades beside the GenesisTrade + + async def announceTrade(self, body=Body()): + # here a Trade will be announced from another participant which + # tries to match their offer with the request of this participant + + trade: Trade = jsonpickle.decode(body["trade"]) + for i in trade.singedRoute: + async with httpx.AsyncClient() as client: + if isinstance(i, SignedRemoteLine): + response = await client.get("http://" + i.line.publicIP.__str__() + ":8000/getPublicKey") + pk = jsonpickle.decode(response.json()) + # logger.info("--------------->" + i.line.__repr__().encode().__str__()) + if not self.dil.verify(pk, i.line.__repr__().encode(), i.signature): + logger.info("===> Signature not valid") + return "Error: Signature not valid" + with redirect_stdout(None): + trade.sig_offer = self.dil.sign_with_input(self.__secretKey, trade.offer.__repr__().encode()) + logger.info("===> confirmed trade") + return jsonpickle.encode(trade) + async def produceNextTrade(self): # here we can take into account if there is a variable consumer in the house (e.g. electric car or heat pump) @@ -295,8 +337,13 @@ class Participant: logger.info("===> checking if new exchange has to be created on " + self.publicIP.__str__()) - for t in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute - if t.is_min_next(): + for e in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute + if e.is_min_next(): + ex_next_min = True + + for t in self.TradeQueue: + # if an exchange already has been matched for the next minute, no new exchange is needed + if t.request.is_min_next() and t.offer.is_min_next(): ex_next_min = True if power > Q_(0, ureg.watt) and not ex_next_min: @@ -319,6 +366,9 @@ class Participant: self.ExchangeQueue.append(r) logger.info("===> created request on " + self.publicIP.__str__()) + if ex_next_min: + logger.info("===> exchange or trade for next minute already exists on " + self.publicIP.__str__()) + async def findRoute(self, start: RemoteParticipant, end: RemoteParticipant) -> Route: s = self.gridNodeIndexes[start.publicKey] e = self.gridNodeIndexes[end.publicKey] @@ -327,19 +377,18 @@ class Participant: graph = self.grid.subgraph(path[e]) return Route(start, end, graph, routeloss) - async def announceRoute(self, t: Trade): + async def announceRoute(self, t: Trade) -> Trade: class GridVisitor(DijkstraVisitor): def __init__(self, trade: Trade): super().__init__() self.examined_edges = set() self.trade: Trade = trade + self.to_be_signed = list() def discover_vertex(self, vertex, distance): - with httpx.Client as client: - response = client.post("http://" + self.trade.route.graph[vertex].publicIP.__str__() + ":8000/sign/" - + self.trade.offer.executiontime.__str__() + "?rljson=" + jsonpickle.encode( - self.trade.route.graph[vertex])) logger.info(f"===> Discovered Trafo: {self.trade.route.graph[vertex].publicIP}") + self.to_be_signed.append(self.trade.route.graph[vertex]) + self.trade.signedCounter += 1 def examine_edge(self, edge): for e in self.examined_edges: @@ -349,11 +398,29 @@ class Participant: logger.info(f"===>Examining Line: {edge[2].publicIP}") self.examined_edges.add(edge) + self.to_be_signed.append(edge[2]) + self.trade.signedCounter += 1 visitor = GridVisitor(t) # walk the route and announce the route to all grid elements involved rx.dijkstra_search(t.route.graph, [self.gridNodeIndexes[t.route.start.publicKey]], lambda _: 1.0, visitor) + for r in visitor.to_be_signed: + if isinstance(r, RemoteLine) or isinstance(r, RemoteTransformer): + async with httpx.AsyncClient() as client: + logger.info("===> announcing route to " + r.publicIP.__str__()) + response = await client.post("http://" + r.publicIP.__str__() + ":8000/sign/" + + int(t.offer.executiontime.timestamp()).__str__(), + json={"remote": jsonpickle.encode(r)}) + # TODO: Continue here + signed = jsonpickle.decode(response.json()) + # some verification should happen here to handle malicious participants + t.singedRoute.append(signed) + + logger.info("===> finished signing route for " + len(t.singedRoute).__str__() + " of " + + (t.signedCounter - 2).__str__() + " participants") # - 2 because start and end are not signed + return t + async def findOffer(self, discarded_rps=frozenset()): closest = self.as_remote_participant() # this is just to have a rp to compare to for rp in self.remoteParticipants: @@ -369,6 +436,7 @@ class Participant: o.set_ureg(ureg) # update the ureg of the offer for pint to work if o.is_min_next(): # once again, only trying to find an offer for the next minute if o.power >= self.ExchangeQueue[0].power: + logger.info("===> found offer on " + closest.publicIP.__str__()) # the offer is big enough to fulfill the request # now find the route to the offering participant r = await self.findRoute(self.as_remote_participant(), closest) @@ -378,11 +446,34 @@ class Participant: # and again, only trades for the next minute are possible at this point t = Trade(o, self.ExchangeQueue[0], self.ExchangeQueue[0].power, o.pricePerWatt, r, next_minute()) - t.set_ureg(ureg) # now announce the route to all connections involved - await self.announceRoute(t) + signedt = await self.announceRoute(t) + # sign the trade to mark it as valid + with redirect_stdout(None): + signedt.sig_req = self.dil.sign_with_input(self.__secretKey, t.__repr__().encode()) + signedt.remove_ureg() + # route made problems with serialisation and is not really needed because signed route + # holds the important information, this should be fixed but time is running out + signedt.route = None + response = await client.post("http://" + closest.publicIP.__str__() + ":8000/announceTrade", + json={"trade": jsonpickle.encode(signedt)}) + signedt.set_ureg(ureg) + logger.info("===> announced trade to " + closest.publicIP.__str__()) + + confirmedt = jsonpickle.decode(response.json()) + + if isinstance(confirmedt, Trade): + # offering participant will only return the trade if its valid + logger.info("===> received confirmation from " + closest.publicIP.__str__()) + if confirmedt.request == self.ExchangeQueue[0]: # own request is fulfilled by trade + self.availableTrades.append(confirmedt) + self.ExchangeQueue.pop(0) + + + + + - logger.info("===> found offer on " + closest.publicIP.__str__()) else: logger.info("===> no one to trade with on " + self.publicIP.__str__()) return @@ -392,6 +483,15 @@ class Participant: # would be able to request power from the grid # in an overlapping period the participant simply takes the power from the grid via the old counter + async def findConfirmableTrade(self) -> Trade: + possibleTrade = None + for rp in self.remoteParticipants: + async with httpx.AsyncClient() as client: + response = await client.get("http://" + rp.publicIP.__str__() + ":8000/getTradeToSign") + possibleTrade = jsonpickle.decode(response.json()) + # TODO: continue here + + async def updateCurrentPower(self): self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand # simulate some random fluctuations @@ -407,6 +507,10 @@ class Participant: logger.info("===> trying to find an offer on " + self.publicIP.__str__()) await self.findOffer() + if len(self.availableTrades) > 0: + conftrade: Trade = await self.findConfirmableTrade() + + class Weather: # Weather might be better done using this but time is running out diff --git a/remoteconnection.py b/remoteconnection.py index 106d34958c35f1a4d1e62ce001fd52fa7e83dcaf..6acc476c77eb7fe76137bba0df0705755453c236 100644 --- a/remoteconnection.py +++ b/remoteconnection.py @@ -4,6 +4,7 @@ import time class RemoteConnection: def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, l: float): + super().__init__() self.publicIP: ipaddress.IPv4Address = ip self.publicKey: bytes = pk self.availableCapacity: float = cap @@ -20,3 +21,4 @@ class RemoteConnection: def __str__(self): return self.publicIP.__str__() + diff --git a/remoteline.py b/remoteline.py index 1a1b6228a25688454bd5034fea847b167e451a8b..29648a56ed3149361ac19be5a8ca3302dc77b9ff 100644 --- a/remoteline.py +++ b/remoteline.py @@ -10,6 +10,10 @@ class RemoteLine(remoteconnection.RemoteConnection): super().__init__(ip, pk, cap, ucap, ll) self.adjacentConnections = cons + def __repr__(self): + return (self.publicIP.__str__() + " " + self.publicKey.hex() + " " + str(self.availableCapacity) + " " + + str(self.usedCapacity) + " " + str(self.loss)) + class SignedRemoteLine: def __init__(self, line: RemoteLine, prev: 'SignedRemoteLine'): diff --git a/remotetransformer.py b/remotetransformer.py index 7794e0e21667823c8a13b49638e20c76389ee035..02e2f31aa7ee58b6ea1f2bd9838d64559e7a45eb 100644 --- a/remotetransformer.py +++ b/remotetransformer.py @@ -11,6 +11,10 @@ class RemoteTransformer(remoteconnection.RemoteConnection): super().__init__(ip, pk, cap, ucap, ll) self.adjacentLines: set[remoteline.RemoteLine] = cons + def __repr__(self): + return (self.publicIP.__str__() + " " + self.publicKey.hex() + " " + str(self.availableCapacity) + " " + + str(self.usedCapacity) + " " + str(self.loss)) + class SignedRemoteTransformer: def __init__(self, transformer: RemoteTransformer, prev: 'SignedRemoteTransformer'): diff --git a/request.py b/request.py index 38a58e0ccc78d8126277442abfb87c125f311a10..61fae780108446b024bd34844a3c5c2c8fdcabe5 100644 --- a/request.py +++ b/request.py @@ -5,3 +5,6 @@ from datetime import datetime class Request(exchange.Exchange): def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime): exchange.Exchange.__init__(self, pk, p, ppw, extime) + + def __eq__(self, other): + return self.publicKey == other.publicKey and self.executiontime == other.executiontime diff --git a/trade.py b/trade.py index 2f20b29aa536dd752b8d6942142d71b9fd62988a..e504aabf04ef25bee665514616e776f0479a0425 100644 --- a/trade.py +++ b/trade.py @@ -1,15 +1,17 @@ import time import datetime +import rustworkx from pint import UnitRegistry import offer import request import route +from remoteparticipant import RemoteParticipant class Trade: - def __init__(self, off: offer.Offer, req: request.Request, pwr: float, ppw: float, rou: route.Route, + def __init__(self, off: offer.Offer, req: request.Request, pwr, ppw: float, rou: route.Route, extime: datetime.datetime): self.ureg = None self.Q_ = None @@ -17,13 +19,42 @@ class Trade: self.offer: offer.Offer = off self.request: request.Request = req self.route = rou + self.singedRoute = list() + self.signedCounter = 0 self.power = pwr self.__pricePerWatt = ppw self.__executionTime: datetime.datetime = extime - self.sig_off: bytes - self.sig_req: bytes + self.sig_off: bytes = bytes(0) + self.sig_req: bytes = bytes(0) + self.confirmed_trades: list[Trade] = [] + self.sig_chain: list[list[tuple[bytes, bytes]]] = [[], []] # public key, signature def set_ureg(self, ureg: UnitRegistry) -> None: self.ureg = ureg self.Q_ = ureg.Quantity self.power = self.Q_(self.power, ureg.watt) + + self.offer.set_ureg(ureg) + self.request.set_ureg(ureg) + + def __repr__(self): + return (self.offer.__repr__() + " " + self.request.__repr__() + " " + str(self.power) + " " + + str(self.__pricePerWatt) + " " + str(self.__executionTime)) + + def remove_ureg(self): + self.power = float(self.power.magnitude) + self.ureg = None + self.Q_ = None + + self.offer.remove_ureg() + self.request.remove_ureg() + + +class GenesisTrade(Trade): + def __init__(self, public_key: bytes, remote_me: RemoteParticipant): + g = rustworkx.PyGraph() + g.add_node(remote_me) + super().__init__(offer.Offer(public_key, 0, 0, datetime.datetime.now()), + request.Request(public_key, 0, 0, datetime.datetime.now()), 0, 0, + route.Route(remote_me, remote_me, g, 0), + datetime.datetime.now()) diff --git a/transformer.py b/transformer.py index fff67ddc22f5d5f1c7255e5b4d4668fbbe6cc695..2db1ef6bd7c47d365c60b02e882cb45e9777c7b1 100644 --- a/transformer.py +++ b/transformer.py @@ -1,12 +1,13 @@ import datetime import ipaddress +import time import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler import argparse import asyncio -from contextlib import asynccontextmanager -from fastapi import APIRouter, FastAPI +from contextlib import asynccontextmanager, redirect_stdout +from fastapi import APIRouter, FastAPI, Body import jsonpickle import uvicorn from apscheduler.jobstores.memory import MemoryJobStore @@ -24,10 +25,10 @@ class Transformer(Connection): def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler): super().__init__(cap, conips, sched) self.adjacentLines: set[RemoteLine] = set() - self.lastSignedTransformer: dict[datetime.datetime, SignedRemoteTransformer] = dict() + self.lastSignedTransformer: dict[float, SignedRemoteTransformer] = dict() # adding fastapi endpoints self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteTransformerJSON, methods=["GET"]) - self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"]) + self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"]) # setting up scheduling run_date = datetime.datetime.now() + datetime.timedelta(hours=1, seconds=5) # +1 hour because timezones suck hard @@ -48,30 +49,41 @@ class Transformer(Connection): self.adjacentLines.update(result) logger.info("===> Transformer: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__()) - async def sign(self, time: datetime.datetime, rtjson): - rt = jsonpickle.decode(rtjson) + async def sign(self, extime: float, body=Body()): + rt = jsonpickle.decode(body["remote"]) + logger.info( + "===> Transformer: " + self.publicIP.__str__() + " received sign request for " + rt.publicIP.__str__()) if rt.publicKey == self.publicKey: # check if the rt actually is me - if time not in self.lastSignedTransformer.keys(): - # has there been no route announced before this one? then create an origin node for the trust chain - origin = SignedRemoteTransformer( - RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity, - self.usedCapacity, self.adjacentLines, self.loss), None) - origin.isOrigin = True - origin.signature = self.dil.sign_with_input(self.__secretKey, origin.__str__()) - result = SignedRemoteTransformer(rt, origin) - result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__()) - self.lastSignedTransformer[time] = result - return jsonpickle.encode(result) + if self.checkCapacity(rt): + if extime not in self.lastSignedTransformer.keys(): + # has there been no route announced before this one? then create an origin node for the trust chain + origin = SignedRemoteTransformer( + RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity, + self.usedCapacity, self.adjacentLines, self.loss), None) + origin.isOrigin = True + with redirect_stdout(None): + origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode()) + result = SignedRemoteTransformer(rt, origin) + with redirect_stdout(None): + result.signature = self.dil.sign_with_input(self.secretKey, rt.__repr__().encode()) + self.lastSignedTransformer[extime] = result + logger.info("===> Transformer: " + self.publicIP.__str__() + " signed first " + rt.publicIP.__str__()) + return jsonpickle.encode(result) + else: + # if there has been a route announced before, add the previous to the new one + result = SignedRemoteTransformer(rt, self.lastSignedTransformer[extime]) + with redirect_stdout(None): + result.signature = self.dil.sign_with_input(self.secretKey, rt.__repr__().encode()) + self.lastSignedTransformer[extime] = result + logger.info("===> Transformer: " + self.publicIP.__str__() + " signed " + rt.publicIP.__str__()) + return jsonpickle.encode(result) else: - # if there has been a route announced before, add the previous to the new one - result = SignedRemoteTransformer(rt, self.lastSignedTransformer[time]) - result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__()) - self.lastSignedTransformer[time] = result - return jsonpickle.encode(result) + return "Insufficient capacity" else: return "Unauthorized" # better handling here would be nice + if __name__ == "__main__": parser = argparse.ArgumentParser(description='Connection service') parser.add_argument('--cap', type=float, required=True, help='Available capacity') diff --git a/util.py b/util.py index cbc70944d7902ffc227652f0fd036669d2eb31d4..ffa2a6d041dd198ebd4d2a30e6f0b3bc40bb7a47 100644 --- a/util.py +++ b/util.py @@ -20,7 +20,7 @@ def next_minute() -> datetime: now = datetime.now() # Calculate the first second of the next minute - start_of_minute = datetime(now.year, now.month, now.day, now.hour, now.minute, 0) + start_of_minute = datetime(now.year, now.month, now.day, now.hour, now.minute, 0, 0) start_of_minute += timedelta(minutes=1) return start_of_minute