diff --git a/CMakeLists.txt b/CMakeLists.txt index 4df006f..09a3462 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,5 +10,5 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -static -g -O2") add_subdirectory(${PROJECT_SOURCE_DIR}/include/ouc_server) -add_executable(test "${PROJECT_SOURCE_DIR}/examples/epoll_tcp_loop.cpp") +add_executable(test "${PROJECT_SOURCE_DIR}/examples/test_thread_pool.cpp") target_link_libraries(test PRIVATE ouc_server_lib) \ No newline at end of file diff --git a/examples/test_thread_pool.cpp b/examples/test_thread_pool.cpp new file mode 100644 index 0000000..364a371 --- /dev/null +++ b/examples/test_thread_pool.cpp @@ -0,0 +1,21 @@ +#include + +int main() +{ + using namespace ouc_server::utils; + ThreadPool pool(4); + + auto f1 = pool.sumbit( + []() + { puts("Hello world from thread pool!"); }); + + auto f2 = pool.sumbit( + [](int a, int b) + { + return a + b; + }, + 1, 2); + + f1.get(); + printf("f2 result: %d\n", f2.get()); +} \ No newline at end of file diff --git a/include/ouc_server/utils/thread_pool.cpp b/include/ouc_server/utils/thread_pool.cpp new file mode 100644 index 0000000..3f4e72a --- /dev/null +++ b/include/ouc_server/utils/thread_pool.cpp @@ -0,0 +1,56 @@ +#include + +namespace ouc_server +{ + namespace utils + { + ThreadPool::ThreadPool(size_t n) + : is_stop(false) + { + for (size_t idx = 0; idx < n; ++idx) + { + workers.reserve(n); + workers.emplace_back( + [this]() + { + while (true) + { + std::function task; + + { + std::unique_lock lk(this->mtx); + + this->cv.wait( + lk, + [this] + { return this->is_stop || (!this->tasks.empty()); }); + + if (this->is_stop && this->tasks.empty()) + return; + + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(); + } + return; + }); + } + } + + ThreadPool::~ThreadPool() + { + { + std::lock_guard lk(mtx); + is_stop = true; + } + + cv.notify_all(); + + for (auto &worker : workers) + if (worker.joinable()) + worker.join(); + } + } +} \ No newline at end of file diff --git a/include/ouc_server/utils/thread_pool.hpp b/include/ouc_server/utils/thread_pool.hpp new file mode 100644 index 0000000..f2ca87d --- /dev/null +++ b/include/ouc_server/utils/thread_pool.hpp @@ -0,0 +1,63 @@ +#ifndef INCLUDE_OUC_SERVER_THREAD_POOL +#define INCLUDE_OUC_SERVER_THREAD_POOL + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ouc_server +{ + namespace utils + { + class ThreadPool + { + private: + std::mutex mtx; + bool is_stop; + std::condition_variable cv; + + std::vector workers; + std::queue> tasks; + + public: + ThreadPool(size_t); + ~ThreadPool(); + + public: + template + auto sumbit(Func &&func, Args &&...args) + -> std::future> + { + using Ret = typename std::invoke_result_t; + + auto task_ptr = std::make_shared< + std::packaged_task>( + std::bind(std::forward(func), std::forward(args)...)); + + std::future res = task_ptr->get_future(); + { + std::unique_lock lk(mtx); + + if (is_stop) + throw std::runtime_error("ThreadPool has been stopped"); + + tasks.emplace( + [task_ptr]() + { (*task_ptr)(); }); + } + cv.notify_one(); + + return res; + } + }; + } +} + +#endif // INCLUDE_OUC_SERVER_THREAD_POOL \ No newline at end of file