-
Notifications
You must be signed in to change notification settings - Fork 1
/
threadpool.hpp
87 lines (73 loc) · 1.84 KB
/
threadpool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#pragma once
#include <condition_variable>
#include <future>
#include <mutex>
#include <thread>
#include <boost/asio/io_service.hpp>
namespace util
{
class CThreadPool
{
public:
explicit CThreadPool( size_t worker_count = std::thread::hardware_concurrency() + 1 )
: io_service_(), placeholder_work_( io_service_ ), threads_(), semaphore_( worker_count )
{
for ( size_t i = 0; i < worker_count; ++i )
threads_.emplace_back( std::bind<size_t ( boost::asio::io_service::* )(), boost::asio::io_service*>(
&boost::asio::io_service::run, &io_service_ ) );
}
template <class F>
std::shared_future<typename std::result_of<F()>::type> PutTask( F&& f )
{
using result_t = typename std::result_of<F()>::type;
semaphore_.wait();
auto task = std::make_shared<std::packaged_task<result_t()>>( f );
std::shared_future<result_t> fut( task->get_future() );
io_service_.post( [task, this] {
( *task )();
semaphore_.notify();
} );
return fut;
}
virtual ~CThreadPool()
{
io_service_.stop();
for ( auto& thread : threads_ )
{
if ( thread.joinable() )
thread.join();
}
}
private:
class CSemaphore
{
public:
explicit CSemaphore( int count = 0 ) : count_( count )
{
}
inline void notify()
{
std::unique_lock<std::mutex> lock( mtx );
count_++;
condition_variable_.notify_one();
}
inline void wait()
{
std::unique_lock<std::mutex> lock( mtx );
while ( count_ == 0 )
{
condition_variable_.wait( lock );
}
count_--;
}
private:
std::mutex mtx;
std::condition_variable condition_variable_;
int count_;
};
boost::asio::io_service io_service_;
boost::asio::io_service::work placeholder_work_;
std::vector<std::thread> threads_;
CSemaphore semaphore_;
};
} // ns util