Skip to content
Snippets Groups Projects
longevity.py 3.44 KiB
Newer Older
  • Learn to ignore specific revisions
  • import http.client
    import json
    import time
    import argparse
    import random
    
    def sleepRandomTime(min_time: int, max_time: int):
        time.sleep(random.randint(min_time, max_time))
    
    
    def requestMode(baseURL: str, min_time: int, max_time: int):
        endpoint = "/api/v1/keys/ksa_key_req"
        request_id = 0
        while True:
            request_id += 1
            print(f"requestId: {request_id}")
            data = {
                "receiving_CKMS_ID": "968fd594-b0e7-41f0-ba4b-de259047a933",
                "request_ID": str(request_id),
                "key_properties": {
                    "number": 1,
                    "key_length": 256,
                    "timeout": 20,
                    "TTL": 24
                }
            }
    
            headers = {
                "Content-Type": "application/json"
            }
    
            # Create a connection
            conn = http.client.HTTPConnection(baseURL)
            # Convert data to JSON
            json_data = json.dumps(data)
            # Make the POST request
            conn.request("POST", endpoint, body=json_data, headers=headers)
    
            # Check the return code
            response = conn.getresponse()
            if response.status != 204:
                print(f"Request failed with status code {response.status}")
                print(response.read().decode())
    
            # Close the connection
            conn.close()
    
            sleepRandomTime(min_time, max_time)
    
    
    def readFile(fileURL: str):
        with open(fileURL, 'r') as file:
            lines = file.readlines()
            parsed_lines = []
            for line in lines:
                data = json.loads(line.strip())
                parsed_line = {
                    "source": data["source"],
                    "request_ID": data["body"]["request_ID"],
                    "process_ID": data["body"]["process_ID"],
                    "ksa_keys": data["body"]["ksa_keys"]
                }
                parsed_lines.append(parsed_line)
    
        return parsed_lines
    
    
    def analyzeMode(fileURLs: list):
        data = []
        for fileURL in fileURLs:
            fileData = readFile(fileURL)
            data.append(fileData)
    
        for i, log in enumerate(data):
            if len(log) == len(data[0]):
                print("Logs are the same length!")
    
    
    def main():
        # Parse the command line arguments --mode, --min_time, --max_time and --base_url
        parser = argparse.ArgumentParser(description="Longevity test script")
        parser.add_argument("--mode", type=str, required=False,
                            help="Mode of operation. Can be 'request' or 'analyze'.")
        parser.add_argument("--min_time", type=int,
                            required=False, help="Minimum time in seconds")
        parser.add_argument("--max_time", type=int,
                            required=False, help="Maximum time in seconds")
        parser.add_argument("--base_url", type=str, required=False,
                            help="Base URL for the requests")
        args = parser.parse_args()
    
        mode = args.mode
        min_time = args.min_time
        max_time = args.max_time
        base_url = args.base_url
    
        # Set default values
        if mode is None:
            mode = "request"
        if min_time is None:
            min_time = 10
        if max_time is None:
            max_time = 240
        if base_url is None:
            base_url = "127.0.0.1:9696"
    
        if mode == "request":
            print("Running in request mode")
            requestMode(base_url, min_time, max_time)
        elif mode == "analyze":
            print("Running in analyze mode")
            analyzeMode(["longevity-tests/akms1-logs/akms-simulator.log", "longevity-tests/akms2-logs/akms-simulator.log"])
    
    if __name__ == "__main__":
        main()