Skip to content
Snippets Groups Projects
controller.py 2.42 KiB
Newer Older
  • Learn to ignore specific revisions
  • James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    import ipaddress
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    import time
    import socket
    import logging
    
    logging.basicConfig(level=logging.INFO)
    
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
    
    
    # "10.12.X.0/29" in Pi lab
    # 10.0.1.0/29 in homelab
    #WORKER_IPS = ["10.12.4.1", "10.12.4.2","10.12.4.3","10.12.4.4","10.12.4.5","10.12.4.6"]
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
    WORKER_IPS = ["10.0.1.2", "10.0.1.5", "10.0.1.6", "10.0.1.7"]
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
    
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    WORKER_PORT = 5000
    BUFFER_SIZE = 1024
    
    
    print(os.environ['WORKERS'])
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
    #print("hello world")
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
    #for ip in ipaddress.IPv4Network("10.0.1.0/29", False):
    #    print(ip)
    #    WORKER_IPS.append(str(ip))
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    
    def send_health_check():
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as controller_socket:
            rtt_results = []
    
            for i, worker_ip in enumerate(WORKER_IPS):
                
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
                if i > (int(os.environ['WORKERS']) - 1):
    
    James Immanuel Magsino's avatar
    James Immanuel Magsino committed
                    break 
    
                for j in range(10):
    
                    start_time = time.time()
                    message = "Health check"
                    controller_socket.sendto(message.encode(), (worker_ip, WORKER_PORT))
                    logging.info(f"Sent health check to Worker {i + 1} at {worker_ip}:{WORKER_PORT}")
    
                    try:
                        controller_socket.settimeout(10)
                        data, _ = controller_socket.recvfrom(BUFFER_SIZE)
                        end_time = time.time()
                        rtt = (end_time - start_time) * 1000  # RTT in milliseconds
    
                        rtt_result.append(rtt)
    
                        logging.info(f"Received response from Worker {i + 1} ({worker_ip}): {data.decode()}")
                        logging.info(f"RTT for Worker {i + 1}: {rtt:.2f} ms")     
                    except socket.timeout:
                        logging.warning(f"Timeout: No response from Worker {i + 1} ({worker_ip})")
                        rtt_result.append(None)
    
                valid_rtts = [rtt for rtt in rtt_result if rtt is not None]
    
                if valid_rtts:
                    avg_rtt = sum(valid_rtts) / len(valid_rtts)
                    logging.info(f"Average RTT for worker {i + 1}: {avg_rtt:.2f} ms")
                    rtt_results.append(avg_rtt)
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
    
    
            # Calculate and log the average RTT if there are valid results
            valid_rtts = [rtt for rtt in rtt_results if rtt is not None]
            if valid_rtts:
                avg_rtt = sum(valid_rtts) / len(valid_rtts)
    
                logging.info(f"Average total RTT: {avg_rtt:.2f} ms")
    
    James Immanuel Magsino's avatar
    hmm
    James Immanuel Magsino committed
            else:
                logging.warning("No valid RTTs received")
    
    if __name__ == "__main__":
        send_health_check()