Skip to content
Snippets Groups Projects
main.py 4.86 KiB
Newer Older
  • Learn to ignore specific revisions
  • import asyncio
    
    from contextlib import asynccontextmanager
    import time
    
    import jsonpickle
    
    import logging
    
    from contextlib import asynccontextmanager
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.jobstores.memory import MemoryJobStore
    
    
    from pydantic import BaseModel
    from typing import Annotated
    
    import dilithium.dilithium
    from dilithium import Dilithium
    
    
    from pint import UnitRegistry
    
    import settings
    
    import DHTdummy
    
    import rustworkx as rx
    
    from rustworkx.visit import DijkstraVisitor
    
    import matplotlib.pyplot as plt
    from rustworkx.visualization import mpl_draw
    
    
    ureg = UnitRegistry()
    Q_ = ureg.Quantity
    
    # get ip of this device/container
    
    import socket
    
    
    def get_ip():
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(0)
        try:
            # doesn't even have to be reachable
            s.connect(('10.254.254.254', 1))
            IP = s.getsockname()[0]
        except Exception:
            IP = '127.0.0.1'
        finally:
            s.close()
        return IP
    
    
    
    class CustomDijkstraVisitor(DijkstraVisitor):
        def __init__(self):
            super().__init__()
            self.examined_edges = set()
    
        def discover_vertex(self, vertex, distance):
            print(f"Discovered vertex: {vertex} with distance: {distance}")
    
        def examine_edge(self, edge):
            for e in self.examined_edges:
                if edge[0] == e[1] and edge[1] == e[0]:
                    print(f"Edge already examined: {edge}")
                    return
    
            print(f"Examining edge: {edge}")
            self.examined_edges.add(edge)
    
    
    # Create a new graph
    graph = rx.PyGraph()
    graph.add_nodes_from([0, 1, 2, 3])
    graph.add_edges_from([(0, 1, 1.0), (1, 2, 2.0), (2, 3, 1.0)])
    
    # Create an instance of the custom visitor
    visitor = CustomDijkstraVisitor()
    
    # Perform Dijkstra's search
    rx.dijkstra_search(graph, [0], lambda _: 1.0, visitor)
    
    '''
    
    class GraphNode:
        def __init__(self, value):
            self.index = None
            self.value = value
    
        def __str__(self):
            return f"GraphNode: {self.value} @ index: {self.index}"
    
    
    class GraphEdge:
        def __init__(self, value):
            self.index = None
            self.value = value
    
        def __str__(self):
            return f"EdgeNode: {self.value} @ index: {self.index}"
    
    graph = rx.PyGraph()
    graph.add_nodes_from([GraphNode(i) for i in range(8)])
    graph.add_edges_from([(i, i + 1, GraphEdge(1)) for i in range(7)])
    # Populate index attribute in GraphNode objects
    for index in graph.node_indices():
        graph[index].index = index
    # Populate index attribute in GraphEdge objects
    for index, data in graph.edge_index_map().items():
        data[2].index = index
    print("Nodes:")
    for node in graph.nodes():
        print(node)
    print("Edges:")
    for edge in graph.edges():
        print(edge)
    
    mpl_draw(graph, with_labels=True, edge_labels=str, labels=str)
    path = rx.dijkstra_shortest_paths(graph, 3, 5, lambda e: e.value)
    
    length = rx.dijkstra_shortest_path_lengths(graph, 2, lambda e: e.value, 6)
    print(length)
    
    plt.show()
    
    
    route = graph.subgraph(path[5])
    
    plt.clf()
    mpl_draw(route, with_labels=True, edge_labels=str, labels=str)
    plt.show()
    '''
    
    
    # --------------- Examples -----------------------
    
    
    '''
    
    a1 = Q_(4, ureg.amp)
    p1 = v1 * a1
    p1.ito(ureg.watt)
    
    #e1.ito(ureg.kilowatt_hour)
    
    d1 = Dilithium(dilithium.dilithium.DEFAULT_PARAMETERS["dilithium3"])
    
    pk, sk = d1.keygen([random.randint(0, numpy.iinfo('uint32').max), random.randint(0, numpy.iinfo('uint32').max),
                        random.randint(0, numpy.iinfo('uint32').max),
                        random.randint(0, numpy.iinfo('uint32').max)])
    
    sig = d1.sign_with_input(sk, msg)
    
    result = d1.verify(pk, msg, sig)
    
    #print(result)
    '''
    jobstores = {
        'default': MemoryJobStore()
    }
    scheduler = AsyncIOScheduler(jobstores=jobstores, timezone='Europe/Berlin')
    logging.basicConfig(
        level=logging.INFO,
        handlers=[
            logging.StreamHandler()
        ]
    )
    logger = logging.getLogger('uvicorn')
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        scheduler.start()
        yield
        scheduler.shutdown()
    
    
    
    # fast_app = FastAPI(lifespan=lifespan)
    
        @scheduler.scheduled_job('interval', seconds=100)
        def scheduled_job_1():
            logger.info("hello from " + get_ip())
    
        part = participant.Participant(nid="test_network", dht_ep={ipaddress.IPv4Address(
    
            '172.20.0.2')}, con_ip=ipaddress.IPv4Address('172.20.1.1'))  # Network_ID should be a Hash for real world use, provide ip adresses of dht endpoints for first discovery
        #scheduler.add_job(part.requestremoteParticipants, 'interval', seconds=20, id='1', name='requestRemoteParticipants')
    
        fast_app.include_router(part.fastRouter)
        uvicorn.run(fast_app, host=get_ip(), port=8000)