diff --git a/.vscode/settings.json b/.vscode/settings.json index 7ffe4f96eb86a7d150b47def95cd3771e2fb2f5b..6e93caa6f1af8d2d74a7bb1beb51f28da08f1858 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -57,6 +57,9 @@ "streambuf": "cpp", "cinttypes": "cpp", "typeinfo": "cpp", - "variant": "cpp" + "variant": "cpp", + "semaphore": "cpp", + "stop_token": "cpp", + "thread": "cpp" } } \ No newline at end of file diff --git a/prak3/readme.md b/prak3/readme.md new file mode 100644 index 0000000000000000000000000000000000000000..44cc43adce4b3694527f8e275ae23a8b619a59c0 --- /dev/null +++ b/prak3/readme.md @@ -0,0 +1,68 @@ +# DataTransfer ๐๐ + +A simple C++ program for **efficient block-wise reading of large files** and outputting their content to standard output. +It utilizes `fread` to read data in configurable block sizes, which can be set via command-line arguments. + +--- + +## ๐ Installation & Execution + +### ๐ง Build the Project +Before running the program, ensure that all necessary scripts are executable: +```sh +chmod +x *.sh +./build_project.sh +``` + +### ๐ Run Server +Use the following command to execute the server: +```sh +./build/src/server +``` +Use the following command to execute the client: +```sh +./build/src/client +``` + +--- + + +## ๐ Performance Testing (EDIT THIT ) + +The following scripts measure execution time and generate profiling data for different scenarios: + +### โณ Execution Time Measurement + +- **With socket output:** + ```sh + ./exec_time.sh socket + ``` +- **With pipe output:** + ```sh + ./exec_time.sh pipe + ``` +- **With shared_memory output:** + ```sh + ./exec_time.sh shared_memory + ``` + +### ๐ฅ Stack Trace & Profiling (Perf + Flamegraphs) + +- **With socket output:** + ```sh + ./stack_trace_with_perf.sh socket + ``` +- **With pipe output:** + ```sh + ./stack_trace_with_perf.sh pipe + ``` +- **With shared_memory output:** + ```sh + ./stack_trace_with_perf.sh shared_memory + ``` +--- + +## ๐ Performance Analysis +These tests help analyze performance differences when using sockets, pipes and shared memory. +Flamegraphs provide a visualization of CPU usage and execution bottlenecks. ๐ + diff --git a/prak3/src/client.cpp b/prak3/src/client.cpp index b046262774cd8857f51c1259b7161ac5ca5685a3..23491602f09365a727227fe350ac6aab73e0c251 100644 --- a/prak3/src/client.cpp +++ b/prak3/src/client.cpp @@ -5,10 +5,39 @@ #define SERVER_IP "127.0.0.1" #define PORT 8080 - #define BUFFER_SIZE 1024 -int main() { +std::string createMessage(const std::string& operation, const std::string& key, const std::string& value = "") { + if (operation == "store") { + return "s" "," + key + "," + value + ";"; + } else if (operation == "get") { + return ("g" + "," + key + ";"; + } else if (operation == "delete") { + return "d" + "," + key + ";"; + } + else { + return ""; + } +} + +int main(int argc, char* argv[]) { + + if (argc < 3 || argc > 4) { + std::cerr << "Usage: " << argv[0] << " <operation> <key> [value]" << std::endl; + return 1; + } + + std::string operation = argv[1]; + std::string key = argv[2]; + std::string value = (argc == 4) ? argv[3] : ""; + + // Nachricht basierend auf den Argumenten erstellen + std::string message = createMessage(operation, key, value); + if (message.empty()) { + std::cerr << "Invalid operation! Use 'store', 'get', or 'delete'." << std::endl; + return 1; + } + int sock; struct sockaddr_in server_address; char buffer[BUFFER_SIZE] = {0}; @@ -23,7 +52,7 @@ int main() { // Serveradresse konfigurieren server_address.sin_family = AF_INET; server_address.sin_port = htons(PORT); - + // IP-Adresse konvertieren if (inet_pton(AF_INET, SERVER_IP, &server_address.sin_addr) <= 0) { perror("Invalid address"); @@ -37,8 +66,7 @@ int main() { } // Nachricht senden - const char* message = "Hello"; - send(sock, message, strlen(message), 0); + send(sock, message.c_str(), message.size(), 0); std::cout << "Message sent: " << message << std::endl; // Antwort empfangen diff --git a/prak3/src/server.cpp b/prak3/src/server.cpp index b983cc279540c63a6592a02d05bdb1d61188cf24..e2f3c1a2edc76dc1cc1a7baf9d819752b367b640 100644 --- a/prak3/src/server.cpp +++ b/prak3/src/server.cpp @@ -1,79 +1,176 @@ #include <iostream> -#include <thread> +#include <unordered_map> #include <vector> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <queue> #include <cstring> #include <unistd.h> #include <arpa/inet.h> +#include <sys/epoll.h> #define PORT 8080 +#define MAX_EVENTS 10 #define BUFFER_SIZE 1024 +#define THREAD_POOL_SIZE 4 -void handle_client(int client_socket) { - char buffer[BUFFER_SIZE] = {0}; +std::unordered_map<std::string, std::string> store; +std::mutex store_mutex; - // Nachricht vom Client empfangen - ssize_t bytes_received = read(client_socket, buffer, BUFFER_SIZE); - if (bytes_received > 0) { - std::cout << "Client: " << buffer << std::endl; +// Task-Queue fรผr Thread-Pool +std::queue<int> task_queue; +std::mutex queue_mutex; +std::condition_variable cv; - // Antwort an den Client senden - const char* response = "Hi"; - send(client_socket, response, strlen(response), 0); - } +// Worker-Funktion fรผr Thread-Pool +void worker_thread() { + while (true) { + int client_socket; + { + std::unique_lock<std::mutex> lock(queue_mutex); + cv.wait(lock, [] { return !task_queue.empty(); }); + client_socket = task_queue.front(); + task_queue.pop(); + } - // Socket schlieรen - close(client_socket); + char buffer[BUFFER_SIZE] = {0}; + ssize_t bytes_received = read(client_socket, buffer, BUFFER_SIZE); + + if (bytes_received <= 0) { + close(client_socket); + continue; + } + + std::string request(buffer); + if (request.back() != ';') { + std::cerr << "Invalid request format" << std::endl; + continue; + } + + std::string response; + char operation = request[0]; + size_t first_comma = request.find(',', 1); + if (first_comma == std::string::npos) { + response = "ERROR: Invalid format;"; + } else { + std::string key = request.substr(2, first_comma - 2); + if (operation == 's') { // Store operation + size_t second_comma = request.find(',', first_comma + 1); + if (second_comma == std::string::npos) { + response = "ERROR: Invalid format;"; + } else { + std::string value = request.substr(first_comma + 1, second_comma - first_comma - 1); + std::lock_guard<std::mutex> lock(store_mutex); + store[key] = value; + response = "STORED;"; + } + } else if (operation == 'g') { // Get operation + std::lock_guard<std::mutex> lock(store_mutex); + auto it = store.find(key); + if (it != store.end()) { + response = "VALUE," + it->second + ";"; + } else { + response = "NOT_FOUND;"; + } + } else if (operation == 'd') { // Delete operation + std::lock_guard<std::mutex> lock(store_mutex); + if (store.erase(key) > 0) { + response = "DELETED;"; + } else { + response = "NOT_FOUND;"; + } + } else { + response = "ERROR: Unknown operation;"; + } + } + send(client_socket, response.c_str(), response.length(), 0); + } } int main() { - int server_fd, client_socket; - struct sockaddr_in address; + int server_fd, epoll_fd; + struct sockaddr_in address{}; socklen_t addrlen = sizeof(address); - std::vector<std::thread> threads; - // Socket erstellen + // Server-Socket erstellen server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd == -1) { perror("Socket failed"); exit(EXIT_FAILURE); } - // Serveradresse festlegen + int opt = 1; + setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(PORT); - // Socket an Port binden if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) { perror("Bind failed"); exit(EXIT_FAILURE); } - // Server lauscht auf eingehende Verbindungen - if (listen(server_fd, 5) < 0) { + if (listen(server_fd, SOMAXCONN) < 0) { perror("Listen failed"); exit(EXIT_FAILURE); } - std::cout << "Server listening on port " << PORT << std::endl; + // Epoll erstellen + epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + perror("Epoll create failed"); + exit(EXIT_FAILURE); + } + + struct epoll_event event{}; + event.events = EPOLLIN; + event.data.fd = server_fd; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) { + perror("Epoll control failed"); + exit(EXIT_FAILURE); + } + + std::vector<std::thread> thread_pool; + for (int i = 0; i < THREAD_POOL_SIZE; i++) { + thread_pool.emplace_back(worker_thread); + } + + struct epoll_event events[MAX_EVENTS]; while (true) { - client_socket = accept(server_fd, (struct sockaddr*)&address, &addrlen); - - if (client_socket < 0) { - perror("Accept failed"); - continue; + int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + if (num_events == -1) { + perror("Epoll wait failed"); + exit(EXIT_FAILURE); } - std::cout << "New client connected" << std::endl; - - // Starte neuen Thread fรผr den Client - threads.emplace_back(handle_client, client_socket); - } + for (int i = 0; i < num_events; i++) { + if (events[i].data.fd == server_fd) { + int client_socket = accept(server_fd, (struct sockaddr*)&address, &addrlen); + if (client_socket < 0) { + perror("Accept failed"); + continue; + } + + struct epoll_event client_event{}; + client_event.events = EPOLLIN; + client_event.data.fd = client_socket; - // Alle Threads beenden - for (auto& t : threads) { - t.join(); + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket, &client_event) == -1) { + perror("Epoll add client failed"); + close(client_socket); + } + } else { + { + std::lock_guard<std::mutex> lock(queue_mutex); + task_queue.push(events[i].data.fd); + } + cv.notify_one(); + } + } } close(server_fd);