-
Saif Eddine Askri authoredSaif Eddine Askri authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.cpp 9.00 KiB
#include <iostream>
#include <thread>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <mutex>
#include <map>
#include <atomic>
#include <shared_mutex>
#define PORT 2525
#define BODY_BUFFER_SIZE 64
#define HEADER_SIZE 1
#define STORE_OPERATION 's'
#define GET_OPERATION 'g'
#define DELETE_OPERATION 'd'
std::mutex mtx;
std::shared_mutex shared_mtx;
std::map<std::string, std::string> DB;
std::atomic<int> request_count(0);
void throughput_monitor() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Anfragen pro Sekunde: " << request_count.load() << std::endl;
request_count.store(0);
}
}
//=========== SLOW PARSER ====================
struct Request {
char operation;
std::string key;
std::string value;
};
Request* parseRequest(const char* buffer) {
// Create a new Request struct
Request* req = new Request();
// Copy the input buffer to a mutable string for tokenization
char* input = strdup(buffer); // Duplicate the buffer
if (!input) {
return nullptr; // Handle memory allocation failure
}
// Tokenize the input string using ',' and ';' as delimiters
char* token = strtok(input, ",;");
if (token) {
// First token is the operation
req->operation = token[0]; // Extract the first character (s, g, or d)
// Second token is the key
token = strtok(nullptr, ",;");
if (token) {
req->key = token;
// Third token is the value (only for store operation)
if (req->operation == 's') {
token = strtok(nullptr, ",;");
if (token) {
req->value = token;
}
}
}
}
// Free the duplicated input string
free(input);
return req;
}
// we assume that this function is slow
std::string processRequest_slow(const std::string& request) {
std::string response;
Request* parsedRequest = parseRequest(request.c_str());
std::lock_guard<std::mutex> lock(mtx);
switch (parsedRequest->operation) {
case STORE_OPERATION:
DB[parsedRequest->key] = parsedRequest->value;
response = "Stored [" + parsedRequest->key + "] = " + parsedRequest->value;
break;
case GET_OPERATION:
if (DB.find(parsedRequest->key) != DB.end()) {
response = "Value of [" + parsedRequest->key + "] is " + DB[parsedRequest->key];
} else {
response = "Key [" + parsedRequest->key + "] not found";
}
break;
case DELETE_OPERATION:
if (DB.erase(parsedRequest->key)) {
response = "Deleted key [" + parsedRequest->key + "]";
} else {
response = "Key [" + parsedRequest->key + "] not found for deletion";
}
break;
default:
response = "Invalid operation!";
break;
}
free(parsedRequest);
return response;
}
//=============================================
// gelichzeitig lesen aber nur einmal schreiben
std::string processRequest_unique_mutex(const std::string& request) {
std::string response;
size_t first = request.find(",");
size_t second = request.find(",", first + 1);
size_t end = request.find(";");
if (first == std::string::npos || end == std::string::npos) {
return "Bad request!";
}
char operation = request[0];
std::string key;
std::string value;
if (second != std::string::npos) {
key = request.substr(first + 1, second - first - 1);
value = request.substr(second + 1, end - second - 1);
} else {
key = request.substr(first + 1, end - first - 1);
}
std::lock_guard<std::mutex> lock(mtx);
switch (operation) {
case STORE_OPERATION:
DB[key] = value;
response = "Stored [" + key + "] = " + value;
break;
case GET_OPERATION:
if (DB.find(key) != DB.end()) {
response = "Value of [" + key + "] is " + DB[key];
} else {
response = "Key [" + key + "] not found";
}
break;
case DELETE_OPERATION:
if (DB.erase(key)) {
response = "Deleted key [" + key + "]";
} else {
response = "Key [" + key + "] not found for deletion";
}
break;
default:
response = "Invalid operation!";
break;
}
return response;
}
// mehrer Thread Lesen gleichzeitig aber ein Thread schreibt.
std::string processRequest_shared_mtx(const std::string& request) {
std::string response;
size_t first = request.find(",");
size_t second = request.find(",", first + 1);
size_t end = request.find(";");
if (first == std::string::npos || end == std::string::npos) {
return "Bad request!";
}
char operation = request[0];
std::string key;
std::string value;
if (second != std::string::npos) {
key = request.substr(first + 1, second - first - 1);
value = request.substr(second + 1, end - second - 1);
} else {
key = request.substr(first + 1, end - first - 1);
}
switch (operation) {
case STORE_OPERATION: {
std::unique_lock<std::shared_mutex> lock(shared_mtx);
DB[key] = value;
response = "Stored [" + key + "] = " + value;
break;
}
case GET_OPERATION: {
std::shared_lock<std::shared_mutex> lock(shared_mtx);
if (DB.find(key) != DB.end()) {
response = "Value of [" + key + "] is " + DB[key];
} else {
response = "Key [" + key + "] not found";
}
break;
}
case DELETE_OPERATION: {
std::unique_lock<std::shared_mutex> lock(shared_mtx);
if (DB.erase(key)) {
response = "Deleted key [" + key + "]";
} else {
response = "Key [" + key + "] not found for deletion";
}
break;
}
default:
response = "Invalid operation!";
break;
}
return response;
}
void handle_client(int client_socket) {
std::string response;
while (true) { // warte bis der Client die Verbindung trennt
char body_buffer[BODY_BUFFER_SIZE] = {0};
ssize_t body_length = 0;
ssize_t bytes_received = 0;
ssize_t total_bytes_received = 0;
uint8_t body_lenght_byte ;
while (bytes_received < HEADER_SIZE)
{
// read header
bytes_received = read(client_socket, &body_lenght_byte , HEADER_SIZE);
if (bytes_received <= 0) {
std::cout << "Client disconnected." << std::endl;
return;
}
}
body_length = static_cast<int>(body_lenght_byte);
if (body_length != 0) {
bytes_received = 0;
total_bytes_received = 0;
// read body
while (total_bytes_received < body_length) {
ssize_t bytes_received = read(client_socket, body_buffer + total_bytes_received, body_length - total_bytes_received);
if (bytes_received <= 0) {
std::cout << "Client disconnected while reading body." << std::endl;
return;
}
total_bytes_received += bytes_received;
}
response = processRequest_shared_mtx(body_buffer);
}else{
response = "Bad request!";
}
// Anfragezähler inkrementieren
request_count.fetch_add(1, std::memory_order_relaxed);
send(client_socket, response.c_str(), response.size(), MSG_ZEROCOPY);
}
close(client_socket);
}
int main() {
int server_fd, client_socket;
struct sockaddr_in address;
socklen_t addrlen = sizeof(address);
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("Socket failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
if (listen(server_fd, 5) < 0) {
perror("Listen failed");
exit(EXIT_FAILURE);
}
std::cout << "Server listening on port " << PORT << std::endl;
// Starte den Throughput-Monitor
std::thread monitor_thread(throughput_monitor);
monitor_thread.detach();
while (true)
{
client_socket = accept(server_fd, (struct sockaddr*)&address, &addrlen);
if (client_socket < 0) {
perror("Accept failed");
continue;
}
std::cout << "New client connected" << std::endl;
std::thread(handle_client, client_socket).detach();
}
close(server_fd);
return 0;
}