Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.cpp 9.00 KiB
#include <iostream>
#include <thread>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <mutex>
#include <map>
#include <atomic>
#include <shared_mutex>

#define PORT 2525
#define BODY_BUFFER_SIZE 64
#define HEADER_SIZE 1

#define STORE_OPERATION 's'
#define GET_OPERATION 'g'
#define DELETE_OPERATION 'd'

std::mutex mtx;
std::shared_mutex shared_mtx;
std::map<std::string, std::string> DB;
std::atomic<int> request_count(0);

void throughput_monitor() {
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Anfragen pro Sekunde: " << request_count.load() << std::endl;
        request_count.store(0);
    }
}

//=========== SLOW PARSER ====================
struct Request {
    char operation;         
    std::string key;      
    std::string value;
};
 Request* parseRequest(const char* buffer) {
    // Create a new Request struct
    Request* req = new Request();

    // Copy the input buffer to a mutable string for tokenization
    char* input = strdup(buffer); // Duplicate the buffer
    if (!input) {
        return nullptr; // Handle memory allocation failure
    }

    // Tokenize the input string using ',' and ';' as delimiters
    char* token = strtok(input, ",;");
    if (token) {
        // First token is the operation
        req->operation = token[0]; // Extract the first character (s, g, or d)

        // Second token is the key
        token = strtok(nullptr, ",;");
        if (token) {
            req->key = token;

            // Third token is the value (only for store operation)
            if (req->operation == 's') {
                token = strtok(nullptr, ",;");
                if (token) {
                    req->value = token;
                }
            }
        }
    }

    // Free the duplicated input string
    free(input);
    return req;
}
// we assume that this function is slow
std::string processRequest_slow(const std::string& request) {

    std::string response;   
    Request* parsedRequest = parseRequest(request.c_str());

    std::lock_guard<std::mutex> lock(mtx);

    switch (parsedRequest->operation) {
        case STORE_OPERATION:
            DB[parsedRequest->key] = parsedRequest->value;
            response = "Stored [" + parsedRequest->key + "] = " + parsedRequest->value;
            break;
        case GET_OPERATION:
            if (DB.find(parsedRequest->key) != DB.end()) {
                response = "Value of [" + parsedRequest->key + "] is " + DB[parsedRequest->key];
            } else {
                response = "Key [" + parsedRequest->key + "] not found";
            }
            break;
        case DELETE_OPERATION:
            if (DB.erase(parsedRequest->key)) {
                response = "Deleted key [" + parsedRequest->key + "]";
            } else {
                response = "Key [" + parsedRequest->key + "] not found for deletion";
            }
            break;
        default:
            response = "Invalid operation!";
            break;
    }
    free(parsedRequest);
    return response;
}
//=============================================

// gelichzeitig lesen aber nur einmal schreiben
std::string processRequest_unique_mutex(const std::string& request) {
    std::string response;

    size_t first = request.find(",");
    size_t second = request.find(",", first + 1);
    size_t end = request.find(";");

    if (first == std::string::npos || end == std::string::npos) {
        return "Bad request!";
    }

    char operation = request[0];
    std::string key;
    std::string value;

    if (second != std::string::npos) {
        key = request.substr(first + 1, second - first - 1);
        value = request.substr(second + 1, end - second - 1);
    } else {
        key = request.substr(first + 1, end - first - 1);
    }

    std::lock_guard<std::mutex> lock(mtx);

    switch (operation) {
        case STORE_OPERATION:
            DB[key] = value;
            response = "Stored [" + key + "] = " + value;
            break;
        case GET_OPERATION:
            if (DB.find(key) != DB.end()) {
                response = "Value of [" + key + "] is " + DB[key];
            } else {
                response = "Key [" + key + "] not found";
            }
            break;
        case DELETE_OPERATION:
            if (DB.erase(key)) {
                response = "Deleted key [" + key + "]";
            } else {
                response = "Key [" + key + "] not found for deletion";
            }
            break;
        default:
            response = "Invalid operation!";
            break;
    }

    return response;
}
// mehrer Thread Lesen gleichzeitig aber ein Thread schreibt.
std::string processRequest_shared_mtx(const std::string& request) {
    std::string response;

    size_t first = request.find(",");
    size_t second = request.find(",", first + 1);
    size_t end = request.find(";");

    if (first == std::string::npos || end == std::string::npos) {
        return "Bad request!";
    }

    char operation = request[0];
    std::string key;
    std::string value;

    if (second != std::string::npos) {
        key = request.substr(first + 1, second - first - 1);
        value = request.substr(second + 1, end - second - 1);
    } else {
        key = request.substr(first + 1, end - first - 1);
    }

    switch (operation) {
        case STORE_OPERATION: {
            std::unique_lock<std::shared_mutex> lock(shared_mtx);
            DB[key] = value;
            response = "Stored [" + key + "] = " + value;
            break;
        }
        case GET_OPERATION: {
            std::shared_lock<std::shared_mutex> lock(shared_mtx);
            if (DB.find(key) != DB.end()) {
                response = "Value of [" + key + "] is " + DB[key];
            } else {
                response = "Key [" + key + "] not found";
            }
            break;
        }
        case DELETE_OPERATION: {
            std::unique_lock<std::shared_mutex> lock(shared_mtx);
            if (DB.erase(key)) {
                response = "Deleted key [" + key + "]";
            } else {
                response = "Key [" + key + "] not found for deletion";
            }
            break;
        }
        default:
            response = "Invalid operation!";
            break;
    }

    return response;
}

void handle_client(int client_socket) {
    
    std::string response;

    while (true) { // warte bis der Client die Verbindung trennt

        char body_buffer[BODY_BUFFER_SIZE] = {0};
        ssize_t body_length = 0;
        ssize_t bytes_received = 0;
        ssize_t total_bytes_received = 0;
        
        uint8_t body_lenght_byte ;

        while (bytes_received < HEADER_SIZE)
        {
            // read header
            bytes_received = read(client_socket, &body_lenght_byte , HEADER_SIZE);
            if (bytes_received <= 0) {
                std::cout << "Client disconnected." << std::endl;
                return;
            }
        }
        
        body_length = static_cast<int>(body_lenght_byte);
        
        if (body_length != 0) { 
           
            bytes_received = 0;
            total_bytes_received = 0;
            // read body
            while (total_bytes_received < body_length) {
                ssize_t bytes_received = read(client_socket, body_buffer + total_bytes_received, body_length - total_bytes_received);
                if (bytes_received <= 0) {
                    std::cout << "Client disconnected while reading body." << std::endl;
                    return;
                }
                total_bytes_received += bytes_received;
            }
            response = processRequest_shared_mtx(body_buffer);
        }else{
            response = "Bad request!";
        }

        // Anfragezähler inkrementieren
        request_count.fetch_add(1, std::memory_order_relaxed);
        send(client_socket, response.c_str(), response.size(), MSG_ZEROCOPY);
    }

    close(client_socket);
}

int main() {
    int server_fd, client_socket;
    struct sockaddr_in address;
    socklen_t addrlen = sizeof(address);

    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("Socket failed");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 5) < 0) {
        perror("Listen failed");
        exit(EXIT_FAILURE);
    }

    std::cout << "Server listening on port " << PORT << std::endl;

    // Starte den Throughput-Monitor
    std::thread monitor_thread(throughput_monitor);
    monitor_thread.detach();

    while (true) 
    {
        client_socket = accept(server_fd, (struct sockaddr*)&address, &addrlen);
        
        if (client_socket < 0) {
            perror("Accept failed");
            continue;
        }

        std::cout << "New client connected" << std::endl;
        
        std::thread(handle_client, client_socket).detach();
    }

    close(server_fd);
    return 0;
}