Skip to content
Snippets Groups Projects
Commit fb068e99 authored by Johannes Hitzinger's avatar Johannes Hitzinger
Browse files

converted connections into lines and transformers

parent bdb9632e
No related branches found
No related tags found
No related merge requests found
import datetime
import ipaddress
import argparse
import asyncio
import httpx
from dilithium import Dilithium
......@@ -19,22 +17,7 @@ import logging
from remoteconnection import RemoteConnection
import socket
def get_pub_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
from util import get_pub_ip
logger = logging.getLogger(__name__)
......@@ -42,80 +25,15 @@ logger = logging.getLogger(__name__)
# TODO: is Connection (cable or transformer) involved in confirming trades? can this be be prevented?
class Connection:
def __init__(self, cap: float, cons: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
self.dil = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
self.__secretKey, self.__publicKey = self.dil.keygen(
self.__secretKey, self.publicKey = self.dil.keygen(
[randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max),
randint(0, numpy.iinfo('uint32').max), randint(0, numpy.iinfo('uint32').max)])
self.fastRouter = APIRouter()
self.scheduler = sched
self.remoteIPs: set[ipaddress.IPv4Address] = cons
self.adjacentConnections: set[RemoteConnection] = set()
self.remoteIPs: set[ipaddress.IPv4Address] = conips
self.publicIP = get_pub_ip()
self.availableCapacity = cap
self.usedCapacity = 0
self.lineloss = 0.01 # this is a wrong assumption, actual line loss is depending on the load
# adding fastapi endpoints
self.fastRouter.add_api_route("/getConnections", self.getConnections, methods=["GET"])
self.fastRouter.add_api_route("/getCapacity", self.getCapacity, methods=["GET"])
self.fastRouter.add_api_route("/asRemoteConnectionJSON", self.asRemoteConnectionJSON, methods=["GET"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
async def getConnections(self):
return jsonpickle.encode(self.adjacentConnections)
async def getCapacity(self):
return self
async def retrieveConnections(self):
result: set[RemoteConnection] = set()
for ip in self.remoteIPs:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/asRemoteConnectionJSON")
result.add(jsonpickle.decode(response.json()))
self.adjacentConnections.update(result)
async def asRemoteConnectionJSON(self):
rc: RemoteConnection = RemoteConnection(self.publicIP, self.__publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections)
return jsonpickle.encode(rc)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
parser.add_argument('--cons', type=str, nargs='+', required=True,
help='List of IP addresses of adjacent connections')
args = parser.parse_args()
argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger.info(argcons)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
con = Connection(args.cap, argcons, scheduler)
fast_app.include_router(con.fastRouter)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)
self.loss = 0.01 # this is a wrong assumption, actual line loss is depending on the load
......@@ -19,7 +19,7 @@ services:
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/dht/dockerfile
connection100:
transformer100:
networks:
network1:
ipv4_address: 172.20.0.100
......@@ -27,9 +27,9 @@ services:
- 8100:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile
command: ["python", "./connection.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"]
connection101:
dockerfile: /home/jonny0815/git/dismagr/transformer/dockerfile
command: ["python", "./transformer.py", "--cap", "1000", "--cons", "172.20.0.101", "172.20.0.102", "172.20.0.103"]
line101:
networks:
network1:
ipv4_address: 172.20.0.101
......@@ -37,9 +37,9 @@ services:
- 8101:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile
command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100"]
connection102:
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100"]
line102:
networks:
network1:
ipv4_address: 172.20.0.102
......@@ -47,9 +47,9 @@ services:
- 8102:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile
command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
connection103:
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
line103:
networks:
network1:
ipv4_address: 172.20.0.103
......@@ -57,8 +57,8 @@ services:
- 8103:8000
build:
context: .
dockerfile: /home/jonny0815/git/dismagr/connection/dockerfile
command: [ "python", "./connection.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
dockerfile: /home/jonny0815/git/dismagr/line/dockerfile
command: [ "python", "./line.py", "--cap", "1000" , "--cons", "172.20.0.100" ]
part11:
networks:
network1:
......
line.py 0 → 100644
import datetime
import ipaddress
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from util import get_pub_ip
import logging
from connection import Connection
from remotetransformer import RemoteTransformer
from remoteparticipant import RemoteParticipant
from remoteline import RemoteLine
logger = logging.getLogger(__name__)
class Line(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentConnections: set[RemoteTransformer | RemoteParticipant] = set()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteLineJSON, methods=["GET"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
async def retrieveConnections(self):
result: set[RemoteTransformer | RemoteParticipant] = set()
for ip in self.remoteIPs:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
result.add(jsonpickle.decode(response.json()))
self.adjacentConnections.update(result)
# logger.info("===> Line: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
async def asRemoteLineJSON(self):
rl: RemoteLine = RemoteLine(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentConnections, self.loss)
return jsonpickle.encode(rl)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
parser.add_argument('--cons', type=str, nargs='+', required=True,
help='List of IP addresses of adjacent connections')
args = parser.parse_args()
argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger.info(argcons)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
lin = Line(args.cap, argcons, scheduler)
fast_app.include_router(lin.fastRouter)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)
FROM python:3.12
# Or any preferred Python version.
ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py .
ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py line.py remotetransformer.py remoteline.py util.py .
RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx
CMD ["python", "./connection.py"]
CMD ["python", "./line.py"]
# Or enter the name of your unique directory and parameter set.
\ No newline at end of file
......@@ -89,6 +89,7 @@ async def lifespan(app: FastAPI):
fast_app = FastAPI(lifespan=lifespan)
# start the server
'''
if __name__ == "__main__":
@scheduler.scheduled_job('interval', seconds=100)
def scheduled_job_1():
......@@ -100,3 +101,4 @@ if __name__ == "__main__":
#scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
fast_app.include_router(part.fastRouter)
uvicorn.run(fast_app, host=get_ip(), port=8000)
'''
......@@ -20,12 +20,15 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from dilithium import Dilithium
import dilithium.dilithium
import numpy
from util import get_pub_ip
from offer import Offer
from request import Request
from exchange import Exchange
from trade import Trade
from remoteconnection import RemoteConnection
from remoteline import RemoteLine
from remotetransformer import RemoteTransformer
from remoteparticipant import RemoteParticipant
......@@ -34,22 +37,6 @@ import logging
logger = logging.getLogger(__name__)
import socket
def get_pub_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
class Participant:
def __init__(self, nid: str, dht_ep: set[ipaddress.IPv4Address], con_ip: ipaddress.IPv4Address,
......@@ -64,8 +51,9 @@ class Participant:
self.__dht_network_id = nid
self.publicIP: ipaddress.IPv4Address = ipaddress.IPv4Address(
ip) # set to current ipv4, confirm before each new round
self.connection: RemoteConnection = None
self.grid = rx.PyGraph()
self.line: RemoteLine = None
self.grid: rx.PyGraph = rx.PyGraph()
self.gridindexes: dict[bytes:int] = dict() # mapping of public keys to indexes in the grid Graph
self.knownIPs: set[ipaddress.IPv4Address] = set()
self.remoteParticipants: set[RemoteParticipant] = set()
self.availableExchanges: set[
......@@ -83,13 +71,14 @@ class Participant:
self.fastRouter.add_api_route("/get_remoteparticipants_asJSON", self.get_remoteparticipants_asjson,
methods=["GET"])
self.fastRouter.add_api_route("/get_knownIPs", self.get_knownIPs, methods=["GET"])
self.fastRouter.add_api_route("/get_ownLine", self.get_ownline, methods=["GET"])
# announce self on dht
asyncio.run(self.dht_startup())
# self.loop.run_in_executor(None, self.dht_startup)
# get potential peers from dht
asyncio.run(self.dht_update_peers())
# retrieve information from my grid connection
asyncio.run(self.buildOwnConnection(con_ip))
asyncio.run(self.buildOwnLine(con_ip))
# set up scheduling for tasks
self.scheduler.add_job(self.request_remoteparticipants, 'interval', seconds=20, id='1',
name='requestRemoteParticipants')
......@@ -146,17 +135,40 @@ class Participant:
self.knownIPs.clear()
return
else: # continued peer updates without dht, periodic dht discovery should still happen to discover new users
logger.info("===>" + self.publicIP.__str__() + " has connection " + self.connection.publicIP.__str__()) # TODO
logger.info(
"===>" + self.publicIP.__str__() + " has connection " + self.line.publicIP.__str__()) # TODO
async def get_knownIPs(self):
return jsonpickle.encode(self.knownIPs)
async def buildOwnConnection(self, con_ip: ipaddress.IPv4Address):
time.sleep(1) # sleep to make sure that the connection is already up
async def buildOwnLine(self, con_ip: ipaddress.IPv4Address):
await asyncio.sleep(1) # sleep to make sure that the connection is already up
async with httpx.AsyncClient() as client:
response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteConnectionJSON")
response = await client.get("http://" + con_ip.__str__() + ":8000/asRemoteJSON")
self.line = jsonpickle.decode(response.json())
logger.info("===> finished buildOwnLine on " + self.publicIP.__str__() + " with " + self.line.publicIP.__str__())
self.connection = jsonpickle.decode(response.json())
async def get_ownline(self):
return jsonpickle.encode(self.line)
async def discoverGrid(self): # this is not robust
pass
'''
i: int = self.grid.add_node(self.connection)
self.gridindexes[self.connection.publicKey] = i
await self.discoverFromNode(self.connection.publicKey)
'''
async def discoverFromNode(self, nodepublickey: bytes):
for con in self.grid[self.gridindexes[nodepublickey]].adjacent_connections:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + con.publicIP.__str__() + ":8000/asRemoteConnectionJSON")
rc = jsonpickle.decode(response.json())
if rc.publicKey not in self.gridindexes:
i: int = self.grid.add_node(rc)
self.gridindexes[rc.publicKey] = i
self.grid.add_edge(self.gridindexes[nodepublickey], i, con.loneloss)
if __name__ == "__main__":
......
FROM python:3.12
# Or any preferred Python version.
ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py route.py .
ADD exchange.py offer.py participant.py request.py trade.py remoteparticipant.py settings.py remoteconnection.py remotetransformer.py remoteline.py route.py util.py .
RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx
CMD ["python", "./participant.py"]
# Or enter the name of your unique directory and parameter set.
\ No newline at end of file
......@@ -3,11 +3,16 @@ import time
class RemoteConnection:
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, cons: set['RemoteConnection']):
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float, l):
self.publicIP: ipaddress.IPv4Address = ip
self.publicKey: bytes = pk
self.__availableCapacity: float = cap
self.__usedCapacity: float = ucap
self.loss = l # loss rate during transmission or transformation
self.timestamp = time.time_ns()
self.adjacentConnections: set[RemoteConnection] = cons
def __eq__(self, other):
return self.publicKey == other.publicKey
def __hash__(self):
return hash(self.publicKey)
import ipaddress
import time
import remoteconnection
import remotetransformer
import remoteparticipant
class RemoteLine(remoteconnection.RemoteConnection):
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float,
cons, ll):
super().__init__(ip, pk, cap, ucap, ll)
self.adjacentConnections = cons
......@@ -36,3 +36,9 @@ class RemoteParticipant:
return True
else:
return False
def __eq__(self, other):
return self.publicKey == other.publicKey
def __hash__(self):
return hash(self.publicKey)
\ No newline at end of file
import ipaddress
import time
import remoteconnection
import remoteline
class RemoteTransformer(remoteconnection.RemoteConnection):
def __init__(self, ip: ipaddress.IPv4Address, pk: bytes, cap: float, ucap: float,
cons: set['remoteline.RemoteLine'], ll):
super().__init__(ip, pk, cap, ucap, ll)
self.adjacentConnections: set[remoteline.RemoteLine] = cons
import datetime
import ipaddress
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import argparse
import asyncio
from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
import jsonpickle
import uvicorn
from apscheduler.jobstores.memory import MemoryJobStore
from util import get_pub_ip
import logging
from connection import Connection
from remotetransformer import RemoteTransformer
from remoteline import RemoteLine
logger = logging.getLogger(__name__)
class Transformer(Connection):
def __init__(self, cap: float, conips: set[ipaddress.IPv4Address], sched: AsyncIOScheduler):
super().__init__(cap, conips, sched)
self.adjacentLines: set[RemoteLine] = set()
# adding fastapi endpoints
self.fastRouter.add_api_route("/asRemoteJSON", self.asRemoteTransformerJSON, methods=["GET"])
# setting up scheduling
run_date = datetime.datetime.now() + datetime.timedelta(hours=1,
seconds=5) # +1 hour because timezones suck hard
self.scheduler.add_job(self.retrieveConnections, 'date', run_date=run_date, id='1', )
async def asRemoteTransformerJSON(self):
rt: RemoteTransformer = RemoteTransformer(self.publicIP, self.publicKey, self.availableCapacity,
self.usedCapacity, self.adjacentLines, self.loss)
return jsonpickle.encode(rt)
async def retrieveConnections(self):
result: set[RemoteLine] = set()
for ip in self.remoteIPs:
async with httpx.AsyncClient() as client:
response = await client.get("http://" + ip.__str__() + ":8000/asRemoteJSON")
result.add(jsonpickle.decode(response.json()))
self.adjacentLines.update(result)
logger.info("===> Transformer: " + self.publicIP.__str__() + " retrieved connections: " + len(result).__str__())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Connection service')
parser.add_argument('--cap', type=float, required=True, help='Available capacity')
parser.add_argument('--cons', type=str, nargs='+', required=True,
help='List of IP addresses of adjacent connections')
args = parser.parse_args()
argcons = set(ipaddress.IPv4Address(ip) for ip in args.cons)
jobstores = {
'default': MemoryJobStore()
}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
logger.info(argcons)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
fast_app = FastAPI(lifespan=lifespan)
tra = Transformer(args.cap, argcons, scheduler)
fast_app.include_router(tra.fastRouter)
uvicorn.run(fast_app, host=get_pub_ip(), port=8000)
FROM python:3.12
# Or any preferred Python version.
ADD exchange.py offer.py request.py trade.py remoteparticipant.py connection.py remoteconnection.py route.py transformer.py remoteline.py remotetransformer.py util.py .
RUN pip install numpy pint fastapi uvicorn dilithium jsonpickle requests httpx apscheduler rustworkx
CMD ["python", "./transformer.py"]
# Or enter the name of your unique directory and parameter set.
\ No newline at end of file
util.py 0 → 100644
import socket
def get_pub_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment