Skip to content
Snippets Groups Projects
Commit fa611a3a authored by Johannes Hitzinger's avatar Johannes Hitzinger
Browse files

connections between two participants can be confirmed by both

started with trust chain, its a mess
parent 6c58ff5e
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,6 @@ from util import get_pub_ip
logger = logging.getLogger(__name__)
# TODO: is Connection (cable or transformer) involved in confirming trades? can this be be prevented?
class Connection:
......@@ -39,8 +38,13 @@ class Connection:
self.usedCapacity = 0
self.loss = 0.01 # this is a wrong assumption, actual line loss is depending on the load
self.fastRouter.add_api_route("/getPublicKey", self.getPublicKey, methods=["GET"])
def checkCapacity(self, rc: RemoteConnection):
if self.availableCapacity - self.usedCapacity > rc.tradeCapacity:
self.usedCapacity += rc.tradeCapacity + rc.loss
return True
return False
async def getPublicKey(self):
return jsonpickle.encode(self.publicKey)
\ No newline at end of file
......@@ -4,22 +4,26 @@ from datetime import datetime, timedelta
class Exchange:
def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime):
def __init__(self, pk: bytes, p, ppw: float, extime: datetime):
self.ureg = None
self.Q_ = None
self.pubicKey: bytes = pk
self.publicKey: bytes = pk
self.timestamp: int = time.time_ns()
self.executiontime: datetime = extime
self.__isActive: bool = False
self.power = p
self.pricePerWatt = ppw # this is in EUR, no new cryptocurrency should be created for this
self.__sig: bytes
def set_ureg(self, ureg: UnitRegistry) -> None:
self.ureg = ureg
self.Q_ = ureg.Quantity
self.power = self.Q_(self.power, ureg.watt)
def remove_ureg(self):
self.power = float(self.power.magnitude)
self.ureg = None
self.Q_ = None
def activate(self) -> None:
self.__isActive = True
......@@ -62,3 +66,7 @@ class Exchange:
def is_old(self) -> bool:
now = datetime.now()
return self.executiontime.timestamp() < int(now.timestamp())
def __repr__(self):
return self.pubicKey.__repr__() + " " + str(self.power) + " " + str(self.pricePerWatt) + " " + str(
self.executiontime)
\ No newline at end of file
import datetime
import ipaddress
import time
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
from contextlib import asynccontextmanager, redirect_stdout
from fastapi import APIRouter, FastAPI, Body
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
......@@ -25,10 +26,10 @@ class Line(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
self.lastSignedLine: dict[datetime.datetime, SignedRemoteLine] = dict()
self.lastSignedLine: dict[float, SignedRemoteLine] = dict()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"])
self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
......@@ -49,28 +50,33 @@ class Line(Connection):
self.usedCapacity, self.adjacentConnections, self.loss)
return jsonpickle.encode(rl)
async def sign(self, time: datetime.datetime, rljson):
print(rljson)
rl = jsonpickle.decode(rljson)
async def sign(self, extime: float, body=Body()):
rl = jsonpickle.decode(body["remote"])
logger.info("===> Line: " + self.publicIP.__str__() + " received sign request for " + rl.publicIP.__str__())
if rl.publicKey == self.publicKey: # check if the rl actually is me
if self.checkCapacity(rl):
print(self.secretKey)
time = time.replace(second=0, microsecond=0) # we are scheduling in minute intervals
if time not in self.lastSignedLine.keys():
if extime not in self.lastSignedLine.keys():
origin = SignedRemoteLine(
RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss), None)
origin.isOrigin = True
origin.signature = self.dil.sign_with_input(self.secretKey, origin.__str__().encode())
with redirect_stdout(None):
origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode())
result = SignedRemoteLine(rl, origin)
result.signature = self.dil.sign_with_input(self.secretKey, result.__str__().encode())
self.lastSignedLine[time] = result
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())
# logger.info("------------------->" + rl.__repr__().encode().__str__())
self.lastSignedLine[extime] = result
logger.info("===> Line: " + self.publicIP.__str__() + " signed first " + rl.publicIP.__str__())
return jsonpickle.encode(result)
else:
# if there has been a route announced before, add the previous to the new one
result = SignedRemoteLine(rl, self.lastSignedLine[time])
result.signature = self.dil.sign_with_input(self.secretKey, result.__str__().encode())
self.lastSignedLine[time] = result
result = SignedRemoteLine(rl, self.lastSignedLine[extime])
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rl.__repr__().encode())
self.lastSignedLine[extime] = result
logger.info("===> Line: " + self.publicIP.__str__() + " signed " + rl.publicIP.__str__())
return jsonpickle.encode(result)
else: # TODO: Rest Error handling
return "Not enough capacity" # better handling here would be nice
......
......@@ -11,7 +11,7 @@ import jsonpickle
import numpy
import uvicorn
import logging
from fastapi import FastAPI, Path, APIRouter
from fastapi import FastAPI, Path, APIRouter, Body
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
......@@ -54,6 +54,7 @@ def get_ip():
return IP
'''
class CustomDijkstraVisitor(DijkstraVisitor):
def __init__(self):
super().__init__()
......@@ -62,7 +63,6 @@ class CustomDijkstraVisitor(DijkstraVisitor):
def discover_vertex(self, vertex, distance):
print(f"Discovered vertex: {vertex} with distance: {distance}")
def examine_edge(self, edge):
for e in self.examined_edges:
if edge[0] == e[1] and edge[1] == e[0]:
......@@ -83,6 +83,7 @@ visitor = CustomDijkstraVisitor()
# Perform Dijkstra's search
rx.dijkstra_search(graph, [0], lambda _: 1.0, visitor)
'''
'''
class GraphNode:
......@@ -148,7 +149,7 @@ e1 = p1 * t1
#e1.ito(ureg.kilowatt_hour)
print(e1)
'''
'''
d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max),
random.randint(0, numpy.iinfo('uint32').max),
......@@ -156,8 +157,27 @@ pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint
msg = b"this is a message"
sig = d1.sign_with_input(sk, msg)
result = d1.verify(pk, msg, sig)
print(sk)
print(result)
#print(sk)
#print(result)
'''
from trade import Trade
from offer import Offer
from request import Request
from route import Route
from remoteparticipant import RemoteParticipant
o1 = Offer(bytes(123), 0.1, 0.1, datetime.datetime.now())
r1 = Request(bytes(123), 0.1, 0.1, datetime.datetime.now())
rp1 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [o1], set())
rp2 = RemoteParticipant(bytes(123), ipaddress.IPv4Address('1.2.3.4'), [r1], set())
rou = Route(rp1, rp2, rx.PyGraph(), 0.1)
t1 = Trade(o1, r1, 0.1, 0.1, rou, datetime.datetime.now())
j = jsonpickle.encode(t1)
t2 = jsonpickle.decode(j)
print(t2)
jobstores = {
'default': MemoryJobStore()
......@@ -181,21 +201,30 @@ async def lifespan(app: FastAPI):
fast_app = FastAPI(lifespan=lifespan)
# start the server
async def test(t=Body()):
print(t)
return t
fast_app.add_api_route("/test", test, methods=["POST"])
# start the server
'''
if __name__ == "__main__":
@scheduler.scheduled_job('interval', seconds=100)
def scheduled_job_1():
logger.info("hello from " + get_ip())
print(datetime.datetime.now())
l = line.Line(cap=100, conips={ipaddress.IPv4Address('0.0.0.0')}, sched=scheduler)
rl = asyncio.run(l.asRemoteLineJSON())
print(rl)
#part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address(
# part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address(
# '172.20.0.2')}, con_ip=ipaddress.IPv4Address('172.20.1.1')) # Network_ID should be a Hash for real world use, provide ip adresses of dht endpoints for first discovery
#scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
# scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
fast_app.include_router(l.fastRouter)
uvicorn.run(fast_app, host=get_ip(), port=8000)
'''
......@@ -5,3 +5,4 @@ from datetime import datetime
class Offer(exchange.Exchange):
def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime):
exchange.Exchange.__init__(self, pk, p, ppw, extime)
......@@ -2,7 +2,7 @@ from pint import UnitRegistry, set_application_registry
import locale
import time
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, redirect_stdout
from datetime import datetime, timedelta
from collections import defaultdict
......@@ -34,14 +34,14 @@ from route import Route
from offer import Offer
from request import Request
from exchange import Exchange
from trade import Trade
from trade import Trade, GenesisTrade
from remoteconnection import RemoteConnection
from remoteline import RemoteLine
from remotetransformer import RemoteTransformer
from remoteline import RemoteLine, SignedRemoteLine
from remotetransformer import RemoteTransformer, SignedRemoteTransformer
from remoteparticipant import RemoteParticipant
from fastapi import APIRouter, FastAPI
from fastapi import APIRouter, FastAPI, Body
import logging
logger = logging.getLogger(__name__)
......@@ -54,7 +54,7 @@ class Participant:
def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address,
scd: AsyncIOScheduler, init_demand: float, solar_wattage: int, ip: ipaddress = get_pub_ip()):
self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
self.__secretKey, self.publicKey = self.dil.keygen(
self.publicKey, self.__secretKey = self.dil.keygen(
[randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max),
randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)])
self.fastRouter = APIRouter()
......@@ -69,10 +69,13 @@ class Participant:
self.gridEdgeIndexes: dict[bytes:set[int]] = defaultdict(set)
self.knownIPs: set[ipaddress.IPv4Address] = set() # known IPs from other participants
self.remoteParticipants: set[RemoteParticipant] = set()
self.ExchangeQueue: list[Exchange] = list() # own exchanges for future turns
self.activeTrades: set[Trade] = set() # own active trades for this turn
self.__tradeHistory: list[Trade] = [] # every own past trade
self.solarWattage = solar_wattage = solar_wattage
self.ExchangeQueue: list[Exchange] = list() # own available exchanges for future turns
self.availableTrades: list[Trade] = list() # own available trades for future turns waiting for confirmation
self.TradeQueue: list[Trade] = list() # fully confirmed trades waiting to be enacted
self.activeTrades: list[Trade] = list() # own active trades for this turn
self.__tradeHistory: list[Trade] = list() # every own past trade
self.__accountBalance: float = 0.0
self.solarWattage = solar_wattage
self.__availablePower = Q_(0, ureg.watt) # possible power exchange with the grid
self.__energyOffset = Q_(0, ureg.watt * ureg.hour) # energy offset for the next round
self.__currentInHouseDemand = Q_(init_demand, ureg.watt) # real time demand from household appliances, storage
......@@ -88,6 +91,8 @@ class Participant:
self.fastRouter.add_api_route("/getOffersJSON", self.getOffersJSON, methods=["GET"])
self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"])
self.fastRouter.add_api_route("/getgridpng", self.getGridPNG, methods=["GET"])
self.fastRouter.add_api_route("/announceTrade", self.announceTrade, methods=["POST"])
self.fastRouter.add_api_route("/getTradeToSign", self.getTradeToSign, methods=["GET"])
# announce self on dht
asyncio.run(self.dht_startup())
# self.loop.run_in_executor(None, self.dht_startup)
......@@ -100,6 +105,12 @@ class Participant:
time.sleep(10)
logger.info("~~~> starting discoverGrid") # participants will be added after each request_remote_participants
asyncio.run(self.discoverGrid())
# generate and add GenesisTrade
gt = GenesisTrade(self.publicKey, self.as_remote_participant())
gt.sig_off = self.dil.sign_with_input(self.__secretKey, gt.offer.__repr__().encode())
gt.sig_req = self.dil.sign_with_input(self.__secretKey, gt.request.__repr__().encode())
self.__tradeHistory.append(gt)
# set up scheduling for tasks
self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=15, id='1',
name='requestRemoteParticipants')
......@@ -207,6 +218,8 @@ class Participant:
url = 'http://' + rp.publicIP.__str__() + ':8000/allRemoteParticipantsJSON'
async with httpx.AsyncClient() as client:
response = await client.get(url)
# this works the first time round but starts throwing errors after, probably because of
# timing issues, only appears when this task runs during route announcement, not critical
remoteparticipants: set[RemoteParticipant] = jsonpickle.decode(response.json())
# some validation of remoteparticipants should happen, not in scope of this thesis
all_remoteparticipants.update(remoteparticipants)
......@@ -277,6 +290,35 @@ class Participant:
"""
return HTMLResponse(content=html_content)
async def getTradeToSign(self):
if len(self.availableTrades) > 0: # prefer trades that actually need confirmation
return jsonpickle.encode(self.availableTrades[0])
if len(self.TradeQueue) > 0: # allow already confirmed trades to be signed
return jsonpickle.encode(self.TradeQueue[-1])
if len(self.activeTrades) > 0: # allow active trade to be signed again (this should be avoided)
return jsonpickle.encode(self.activeTrades[-1])
return jsonpickle.encode(self.__tradeHistory[-1]) # return the last trade in the history, this should only
# happen when the participant is new and has no trades beside the GenesisTrade
async def announceTrade(self, body=Body()):
# here a Trade will be announced from another participant which
# tries to match their offer with the request of this participant
trade: Trade = jsonpickle.decode(body["trade"])
for i in trade.singedRoute:
async with httpx.AsyncClient() as client:
if isinstance(i, SignedRemoteLine):
response = await client.get("http://" + i.line.publicIP.__str__() + ":8000/getPublicKey")
pk = jsonpickle.decode(response.json())
# logger.info("--------------->" + i.line.__repr__().encode().__str__())
if not self.dil.verify(pk, i.line.__repr__().encode(), i.signature):
logger.info("===> Signature not valid")
return "Error: Signature not valid"
with redirect_stdout(None):
trade.sig_offer = self.dil.sign_with_input(self.__secretKey, trade.offer.__repr__().encode())
logger.info("===> confirmed trade")
return jsonpickle.encode(trade)
async def produceNextTrade(self):
# here we can take into account if there is a variable consumer in the house (e.g. electric car or heat pump)
......@@ -295,8 +337,13 @@ class Participant:
logger.info("===> checking if new exchange has to be created on " + self.publicIP.__str__())
for t in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute
if t.is_min_next():
for e in self.ExchangeQueue: # only produce a new exchange if there is no exchange for the next minute
if e.is_min_next():
ex_next_min = True
for t in self.TradeQueue:
# if an exchange already has been matched for the next minute, no new exchange is needed
if t.request.is_min_next() and t.offer.is_min_next():
ex_next_min = True
if power > Q_(0, ureg.watt) and not ex_next_min:
......@@ -319,6 +366,9 @@ class Participant:
self.ExchangeQueue.append(r)
logger.info("===> created request on " + self.publicIP.__str__())
if ex_next_min:
logger.info("===> exchange or trade for next minute already exists on " + self.publicIP.__str__())
async def findRoute(self, start: RemoteParticipant, end: RemoteParticipant) -> Route:
s = self.gridNodeIndexes[start.publicKey]
e = self.gridNodeIndexes[end.publicKey]
......@@ -327,19 +377,18 @@ class Participant:
graph = self.grid.subgraph(path[e])
return Route(start, end, graph, routeloss)
async def announceRoute(self, t: Trade):
async def announceRoute(self, t: Trade) -> Trade:
class GridVisitor(DijkstraVisitor):
def __init__(self, trade: Trade):
super().__init__()
self.examined_edges = set()
self.trade: Trade = trade
self.to_be_signed = list()
def discover_vertex(self, vertex, distance):
with httpx.Client as client:
response = client.post("http://" + self.trade.route.graph[vertex].publicIP.__str__() + ":8000/sign/"
+ self.trade.offer.executiontime.__str__() + "?rljson=" + jsonpickle.encode(
self.trade.route.graph[vertex]))
logger.info(f"===> Discovered Trafo: {self.trade.route.graph[vertex].publicIP}")
self.to_be_signed.append(self.trade.route.graph[vertex])
self.trade.signedCounter += 1
def examine_edge(self, edge):
for e in self.examined_edges:
......@@ -349,11 +398,29 @@ class Participant:
logger.info(f"===>Examining Line: {edge[2].publicIP}")
self.examined_edges.add(edge)
self.to_be_signed.append(edge[2])
self.trade.signedCounter += 1
visitor = GridVisitor(t)
# walk the route and announce the route to all grid elements involved
rx.dijkstra_search(t.route.graph, [self.gridNodeIndexes[t.route.start.publicKey]], lambda _: 1.0, visitor)
for r in visitor.to_be_signed:
if isinstance(r, RemoteLine) or isinstance(r, RemoteTransformer):
async with httpx.AsyncClient() as client:
logger.info("===> announcing route to " + r.publicIP.__str__())
response = await client.post("http://" + r.publicIP.__str__() + ":8000/sign/" +
int(t.offer.executiontime.timestamp()).__str__(),
json={"remote": jsonpickle.encode(r)})
# TODO: Continue here
signed = jsonpickle.decode(response.json())
# some verification should happen here to handle malicious participants
t.singedRoute.append(signed)
logger.info("===> finished signing route for " + len(t.singedRoute).__str__() + " of " +
(t.signedCounter - 2).__str__() + " participants") # - 2 because start and end are not signed
return t
async def findOffer(self, discarded_rps=frozenset()):
closest = self.as_remote_participant() # this is just to have a rp to compare to
for rp in self.remoteParticipants:
......@@ -369,6 +436,7 @@ class Participant:
o.set_ureg(ureg) # update the ureg of the offer for pint to work
if o.is_min_next(): # once again, only trying to find an offer for the next minute
if o.power >= self.ExchangeQueue[0].power:
logger.info("===> found offer on " + closest.publicIP.__str__())
# the offer is big enough to fulfill the request
# now find the route to the offering participant
r = await self.findRoute(self.as_remote_participant(), closest)
......@@ -378,11 +446,34 @@ class Participant:
# and again, only trades for the next minute are possible at this point
t = Trade(o, self.ExchangeQueue[0], self.ExchangeQueue[0].power, o.pricePerWatt, r,
next_minute())
t.set_ureg(ureg)
# now announce the route to all connections involved
await self.announceRoute(t)
signedt = await self.announceRoute(t)
# sign the trade to mark it as valid
with redirect_stdout(None):
signedt.sig_req = self.dil.sign_with_input(self.__secretKey, t.__repr__().encode())
signedt.remove_ureg()
# route made problems with serialisation and is not really needed because signed route
# holds the important information, this should be fixed but time is running out
signedt.route = None
response = await client.post("http://" + closest.publicIP.__str__() + ":8000/announceTrade",
json={"trade": jsonpickle.encode(signedt)})
signedt.set_ureg(ureg)
logger.info("===> announced trade to " + closest.publicIP.__str__())
confirmedt = jsonpickle.decode(response.json())
if isinstance(confirmedt, Trade):
# offering participant will only return the trade if its valid
logger.info("===> received confirmation from " + closest.publicIP.__str__())
if confirmedt.request == self.ExchangeQueue[0]: # own request is fulfilled by trade
self.availableTrades.append(confirmedt)
self.ExchangeQueue.pop(0)
logger.info("===> found offer on " + closest.publicIP.__str__())
else:
logger.info("===> no one to trade with on " + self.publicIP.__str__())
return
......@@ -392,6 +483,15 @@ class Participant:
# would be able to request power from the grid
# in an overlapping period the participant simply takes the power from the grid via the old counter
async def findConfirmableTrade(self) -> Trade:
possibleTrade = None
for rp in self.remoteParticipants:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + rp.publicIP.__str__() + ":8000/getTradeToSign")
possibleTrade = jsonpickle.decode(response.json())
# TODO: continue here
async def updateCurrentPower(self):
self.__availablePower = self.currentInHouseSupply - self.__currentInHouseDemand
# simulate some random fluctuations
......@@ -407,6 +507,10 @@ class Participant:
logger.info("===> trying to find an offer on " + self.publicIP.__str__())
await self.findOffer()
if len(self.availableTrades) > 0:
conftrade: Trade = await self.findConfirmableTrade()
class Weather:
# Weather might be better done using this but time is running out
......
......@@ -4,6 +4,7 @@ import time
class RemoteConnection:
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, l: float):
super().__init__()
self.publicIP: ipaddress.IPv4Address = ip
self.publicKey: bytes = pk
self.availableCapacity: float = cap
......@@ -20,3 +21,4 @@ class RemoteConnection:
def __str__(self):
return self.publicIP.__str__()
......@@ -10,6 +10,10 @@ class RemoteLine(remoteconnection.RemoteConnection):
super().__init__(ip, pk, cap, ucap, ll)
self.adjacentConnections = cons
def __repr__(self):
return (self.publicIP.__str__() + " " + self.publicKey.hex() + " " + str(self.availableCapacity) + " " +
str(self.usedCapacity) + " " + str(self.loss))
class SignedRemoteLine:
def __init__(self, line: RemoteLine, prev: 'SignedRemoteLine'):
......
......@@ -11,6 +11,10 @@ class RemoteTransformer(remoteconnection.RemoteConnection):
super().__init__(ip, pk, cap, ucap, ll)
self.adjacentLines: set[remoteline.RemoteLine] = cons
def __repr__(self):
return (self.publicIP.__str__() + " " + self.publicKey.hex() + " " + str(self.availableCapacity) + " " +
str(self.usedCapacity) + " " + str(self.loss))
class SignedRemoteTransformer:
def __init__(self, transformer: RemoteTransformer, prev: 'SignedRemoteTransformer'):
......
......@@ -5,3 +5,6 @@ from datetime import datetime
class Request(exchange.Exchange):
def __init__(self, pk: bytes, p: float, ppw: float, extime: datetime):
exchange.Exchange.__init__(self, pk, p, ppw, extime)
def __eq__(self, other):
return self.publicKey == other.publicKey and self.executiontime == other.executiontime
import time
import datetime
import rustworkx
from pint import UnitRegistry
import offer
import request
import route
from remoteparticipant import RemoteParticipant
class Trade:
def __init__(self, off: offer.Offer, req: request.Request, pwr: float, ppw: float, rou: route.Route,
def __init__(self, off: offer.Offer, req: request.Request, pwr, ppw: float, rou: route.Route,
extime: datetime.datetime):
self.ureg = None
self.Q_ = None
......@@ -17,13 +19,42 @@ class Trade:
self.offer: offer.Offer = off
self.request: request.Request = req
self.route = rou
self.singedRoute = list()
self.signedCounter = 0
self.power = pwr
self.__pricePerWatt = ppw
self.__executionTime: datetime.datetime = extime
self.sig_off: bytes
self.sig_req: bytes
self.sig_off: bytes = bytes(0)
self.sig_req: bytes = bytes(0)
self.confirmed_trades: list[Trade] = []
self.sig_chain: list[list[tuple[bytes, bytes]]] = [[], []] # public key, signature
def set_ureg(self, ureg: UnitRegistry) -> None:
self.ureg = ureg
self.Q_ = ureg.Quantity
self.power = self.Q_(self.power, ureg.watt)
self.offer.set_ureg(ureg)
self.request.set_ureg(ureg)
def __repr__(self):
return (self.offer.__repr__() + " " + self.request.__repr__() + " " + str(self.power) + " " +
str(self.__pricePerWatt) + " " + str(self.__executionTime))
def remove_ureg(self):
self.power = float(self.power.magnitude)
self.ureg = None
self.Q_ = None
self.offer.remove_ureg()
self.request.remove_ureg()
class GenesisTrade(Trade):
def __init__(self, public_key: bytes, remote_me: RemoteParticipant):
g = rustworkx.PyGraph()
g.add_node(remote_me)
super().__init__(offer.Offer(public_key, 0, 0, datetime.datetime.now()),
request.Request(public_key, 0, 0, datetime.datetime.now()), 0, 0,
route.Route(remote_me, remote_me, g, 0),
datetime.datetime.now())
import datetime
import ipaddress
import time
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
from contextlib import asynccontextmanager, redirect_stdout
from fastapi import APIRouter, FastAPI, Body
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
......@@ -24,10 +25,10 @@ class Transformer(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentLines: set[RemoteLine] = set()
self.lastSignedTransformer: dict[datetime.datetime, SignedRemoteTransformer] = dict()
self.lastSignedTransformer: dict[float, SignedRemoteTransformer] = dict()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteTransformerJSON, methods=["GET"])
self.fastRouter.add_api_route("/sign/{time}", self.sign, methods=["POST"])
self.fastRouter.add_api_route("/sign/{extime}", self.sign, methods=["POST"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
......@@ -48,30 +49,41 @@ class Transformer(Connection):
self.adjacentLines.update(result)
logger.info("===> Transformer: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
async def sign(self, time: datetime.datetime, rtjson):
rt = jsonpickle.decode(rtjson)
async def sign(self, extime: float, body=Body()):
rt = jsonpickle.decode(body["remote"])
logger.info(
"===> Transformer: " + self.publicIP.__str__() + " received sign request for " + rt.publicIP.__str__())
if rt.publicKey == self.publicKey: # check if the rt actually is me
if time not in self.lastSignedTransformer.keys():
# has there been no route announced before this one? then create an origin node for the trust chain
origin = SignedRemoteTransformer(
RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentLines, self.loss), None)
origin.isOrigin = True
origin.signature = self.dil.sign_with_input(self.__secretKey, origin.__str__())
result = SignedRemoteTransformer(rt, origin)
result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
self.lastSignedTransformer[time] = result
return jsonpickle.encode(result)
if self.checkCapacity(rt):
if extime not in self.lastSignedTransformer.keys():
# has there been no route announced before this one? then create an origin node for the trust chain
origin = SignedRemoteTransformer(
RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentLines, self.loss), None)
origin.isOrigin = True
with redirect_stdout(None):
origin.signature = self.dil.sign_with_input(self.secretKey, origin.__repr__().encode())
result = SignedRemoteTransformer(rt, origin)
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rt.__repr__().encode())
self.lastSignedTransformer[extime] = result
logger.info("===> Transformer: " + self.publicIP.__str__() + " signed first " + rt.publicIP.__str__())
return jsonpickle.encode(result)
else:
# if there has been a route announced before, add the previous to the new one
result = SignedRemoteTransformer(rt, self.lastSignedTransformer[extime])
with redirect_stdout(None):
result.signature = self.dil.sign_with_input(self.secretKey, rt.__repr__().encode())
self.lastSignedTransformer[extime] = result
logger.info("===> Transformer: " + self.publicIP.__str__() + " signed " + rt.publicIP.__str__())
return jsonpickle.encode(result)
else:
# if there has been a route announced before, add the previous to the new one
result = SignedRemoteTransformer(rt, self.lastSignedTransformer[time])
result.signature = self.dil.sign_with_input(self.__secretKey, result.__str__())
self.lastSignedTransformer[time] = result
return jsonpickle.encode(result)
return "Insufficient capacity"
else:
return "Unauthorized" # better handling here would be nice
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
......
......@@ -20,7 +20,7 @@ def next_minute() -> datetime:
now = datetime.now()
# Calculate the first second of the next minute
start_of_minute = datetime(now.year, now.month, now.day, now.hour, now.minute, 0)
start_of_minute = datetime(now.year, now.month, now.day, now.hour, now.minute, 0, 0)
start_of_minute += timedelta(minutes=1)
return start_of_minute
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment