Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include "semaphore.hpp"
/**
* @brief A collection of worker threads that can work on tasks in a task queue.
* This allows for using threads to parallelize tasks while only having a fixed
* number of reusable threads.
*
* This highly reduces overhead that would come with creating new threads and can
* also prevent the "too many threads" problem. There is no point in having more
* actively working threads than cpu cores since the cpu can't parallelize the
* work further.
*/
class Threadpool
{
private:
/**
* @brief Indicated if a shutdown was initiated and the threadpool can no longer
* accept new tasks.
*/
bool shutdownInitiated = false;
/**
* @brief The number of concurrent workers in the threadpool
*/
int numberOfWorkers;
/**
* @brief The list of thread handles for the concurrent workers
*/
std::vector<std::thread> workerThreads;
/**
* @brief A task queue that contains functions that will be executed by the
* workers threads in the same order as they were added (fifo).
*
* NOTE: The access is not synchronized by default and the mtxTaskQueue mutex
* must be used to access this variable.
*/
std::queue <
std::function<void ()>
> taskQueue;
/**
* @brief Mutex to synchronize access to the taskQueue variable.
*/
std::mutex mtxTaskQueue;
/**
* @brief This semaphore is used to notify the workers that there are tasks available.
* The value represents te number of open tasks in the taskQueue. Workers will
* therefore wait if there are no tasks and continue if tasks get added to the
* taskQueue.
*
* The semaphore has is posted when tasks are added to the taskQueue.
*/
Semaphore semWaitForTask;
/**
* @brief The maximum number of open tasks in the taskQueue. If this number
* is reached, the addTask() method will block.
*/
int maxQueueBacklog;
/**
* @brief This semaphore is used to limit the number of open tasks on the taskQueue.
*
* It is initialized with the max number of open tasks (maxQueueBacklog) and
* the addTask function waits on the semaphore while the workers post it
* after removing a task from the taskQueue.
*/
Semaphore semTaskQueueLimit;
/**
* @brief Internal worker function that waits for and then completes tasks from
* the taskQueue. This is the function that is executed by the worker threads.
*
* @param workerId The id of the current worker
*/
void workLoop(int workerId);
public:
/**
* @brief Automatically determine the number of threads to use in the threadpool.
* This will use (number of logical cores / 2) as the number of threads.
*/
static const int AUTO_NO_WORKERS = 0;
/**
* @brief Allow an unlimited queue size and never block when adding tasks.
*/
static const int DISABLE_MAX_QUEUE_BACKLOG = 0;
/**
* @brief Create a new Threadpool with the specified number of worker threads
* and maximum queue backlog.
*
* The number of worker threads should not be higher than the number of physical
* cores. In certain unlikely circumstances it might exist a small performance
* increase by setting it to the number of logical cores (cpu "threads" with
* hyperthreading), but this will not be true in almost all cases.
*
* @param numberOfWorkers The number of worker threads to use for the threadpool.
*
* @param maxQueueBacklog The number of open tasks in the task queue before the
* addTask method will block.
*/
Threadpool(int numberOfWorkers = AUTO_NO_WORKERS, int maxQueueBacklog = DISABLE_MAX_QUEUE_BACKLOG);
/**
* @brief Add a task to the task queue to be processed by the threadpool.
* If the number of open tasks in the threadpool is equal to the value of
* maxQueueBacklog this function will block until tasks are removed from the
* queue by the worker threads.
*
* @param task The task function to be executed by the threadpool.
*
* @warning This function will block if the number of open tasks in the
* threadpool is equal to the value of maxQueueBacklog this function will
* block until tasks are removed from the queue by the worker threads.
*/
void addTask(std::function<void ()> task);
/**
* @brief Block until all of the worker threads have ended.
* Note that the worker threads will not end on their own unless shutdown
* is called.
*/
void joinAll();
/**
* @brief Initiate a shutdown of the threadpool. The worker threads will continue
* to finish all open tasks and then terminate.
*/
void shutdown();
};
#endif // _THREADPOOL_H