feat: 实现最简单线程池并通过测试

This commit is contained in:
2025-09-28 13:20:52 +00:00
parent 648ed1575a
commit 8d12ce73a0
4 changed files with 141 additions and 1 deletions
+1 -1
View File
@@ -10,5 +10,5 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -static -g -O2")
add_subdirectory(${PROJECT_SOURCE_DIR}/include/ouc_server)
add_executable(test "${PROJECT_SOURCE_DIR}/examples/epoll_tcp_loop.cpp")
add_executable(test "${PROJECT_SOURCE_DIR}/examples/test_thread_pool.cpp")
target_link_libraries(test PRIVATE ouc_server_lib)
+21
View File
@@ -0,0 +1,21 @@
#include <utils/thread_pool.hpp>
int main()
{
using namespace ouc_server::utils;
ThreadPool pool(4);
auto f1 = pool.sumbit(
[]()
{ puts("Hello world from thread pool!"); });
auto f2 = pool.sumbit(
[](int a, int b)
{
return a + b;
},
1, 2);
f1.get();
printf("f2 result: %d\n", f2.get());
}
+56
View File
@@ -0,0 +1,56 @@
#include <utils/thread_pool.hpp>
namespace ouc_server
{
namespace utils
{
ThreadPool::ThreadPool(size_t n)
: is_stop(false)
{
for (size_t idx = 0; idx < n; ++idx)
{
workers.reserve(n);
workers.emplace_back(
[this]()
{
while (true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(this->mtx);
this->cv.wait(
lk,
[this]
{ return this->is_stop || (!this->tasks.empty()); });
if (this->is_stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
return;
});
}
}
ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lk(mtx);
is_stop = true;
}
cv.notify_all();
for (auto &worker : workers)
if (worker.joinable())
worker.join();
}
}
}
+63
View File
@@ -0,0 +1,63 @@
#ifndef INCLUDE_OUC_SERVER_THREAD_POOL
#define INCLUDE_OUC_SERVER_THREAD_POOL
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#include <future>
#include <functional>
#include <type_traits>
#include <memory>
#include <utility>
namespace ouc_server
{
namespace utils
{
class ThreadPool
{
private:
std::mutex mtx;
bool is_stop;
std::condition_variable cv;
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
public:
ThreadPool(size_t);
~ThreadPool();
public:
template <typename Func, typename... Args>
auto sumbit(Func &&func, Args &&...args)
-> std::future<typename std::invoke_result_t<Func, Args...>>
{
using Ret = typename std::invoke_result_t<Func, Args...>;
auto task_ptr = std::make_shared<
std::packaged_task<Ret()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<Ret> res = task_ptr->get_future();
{
std::unique_lock<std::mutex> lk(mtx);
if (is_stop)
throw std::runtime_error("ThreadPool has been stopped");
tasks.emplace(
[task_ptr]()
{ (*task_ptr)(); });
}
cv.notify_one();
return res;
}
};
}
}
#endif // INCLUDE_OUC_SERVER_THREAD_POOL