From 648f72cb60836ca27032f4d46ad201d211645f07 Mon Sep 17 00:00:00 2001 From: pjh456 <147148383@qq.com> Date: Tue, 30 Sep 2025 14:39:59 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E4=BA=86=20MPMC=20?= =?UTF-8?q?=E6=97=A0=E9=94=81=E7=8E=AF=E5=BD=A2=E9=98=9F=E5=88=97=E5=B9=B6?= =?UTF-8?q?=E9=80=9A=E8=BF=87=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 2 +- examples/test_mpmc_queue.cpp | 71 ++++++++++ include/ouc_server/utils/mpmc_queue.hpp | 166 ++++++++++++++++++++++++ 3 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 examples/test_mpmc_queue.cpp create mode 100644 include/ouc_server/utils/mpmc_queue.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index fc27440..8c9a99b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,5 +10,5 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -static -g -O2") add_subdirectory(${PROJECT_SOURCE_DIR}/include/ouc_server) -add_executable(test "${PROJECT_SOURCE_DIR}/examples/test_tcp_server.cpp") +add_executable(test "${PROJECT_SOURCE_DIR}/examples/test_mpmc_queue.cpp") target_link_libraries(test PRIVATE ouc_server_lib) \ No newline at end of file diff --git a/examples/test_mpmc_queue.cpp b/examples/test_mpmc_queue.cpp new file mode 100644 index 0000000..0481c5a --- /dev/null +++ b/examples/test_mpmc_queue.cpp @@ -0,0 +1,71 @@ +#include + +#include +#include +#include +#include +#include +#include + +constexpr size_t PRODUCER_COUNT = 4; +constexpr size_t CONSUMER_COUNT = 4; +constexpr size_t ITEMS_PER_PRODUCER = 100; + +int main() +{ + pjh_std::MPMCQueue queue(1024); + + // 生产者线程 + std::vector producers; + for (size_t i = 0; i < PRODUCER_COUNT; ++i) + { + producers.emplace_back( + [i, &queue]() + { + for (size_t j = 0; j < ITEMS_PER_PRODUCER; ++j) { + // 非阻塞 push,失败就重试 + while (!queue.push(static_cast(i * ITEMS_PER_PRODUCER + j))) { + std::this_thread::yield(); + } + } }); + } + + // 消费者线程 + std::vector consumers; + std::atomic consumed{0}; + for (size_t i = 0; i < CONSUMER_COUNT; ++i) + { + consumers.emplace_back( + [&queue, &consumed]() + { + while (true) { + auto val = queue.pop(); + if (val) { + consumed.fetch_add(1, std::memory_order_relaxed); + } else { + // 队列空,检查是否已经完成 + if (consumed.load(std::memory_order_relaxed) >= PRODUCER_COUNT * ITEMS_PER_PRODUCER) + break; + std::this_thread::yield(); + } + } }); + } + + for (auto &t : producers) + if (t.joinable()) + t.join(); + + for (auto &t : consumers) + if (t.joinable()) + t.join(); + + std::cout << "Total produced: " << PRODUCER_COUNT * ITEMS_PER_PRODUCER << "\n"; + std::cout << "Total consumed: " << consumed.load() << "\n"; + + if (consumed.load() == PRODUCER_COUNT * ITEMS_PER_PRODUCER) + std::cout << "Test passed.\n"; + else + std::cout << "Test failed.\n"; + + return 0; +} \ No newline at end of file diff --git a/include/ouc_server/utils/mpmc_queue.hpp b/include/ouc_server/utils/mpmc_queue.hpp new file mode 100644 index 0000000..c204bc4 --- /dev/null +++ b/include/ouc_server/utils/mpmc_queue.hpp @@ -0,0 +1,166 @@ +#ifndef INCLUDE_MPMC_QUEUE +#define INCLUDE_MPMC_QUEUE + +#include +#include +#include +#include +#include +#include + +namespace pjh_std +{ + template + class MPMCQueue + { + private: + struct Cell + { + std::atomic sequence; + typename std::aligned_storage::type storage; + }; + Cell *m_buffer; + + size_t m_capacity; + std::atomic m_head, m_tail; + size_t mask; + + public: + explicit MPMCQueue(size_t p_capacity) + : m_capacity(2), + m_buffer(nullptr) + { + while (m_capacity < p_capacity) + m_capacity <<= 1; + mask = m_capacity - 1; + + m_buffer = new Cell[m_capacity](); + for (size_t idx = 0; idx < m_capacity; ++idx) + m_buffer[idx].sequence.store(idx, std::memory_order_relaxed); + + m_head.store(0, std::memory_order_relaxed); + m_tail.store(0, std::memory_order_relaxed); + } + + MPMCQueue(const MPMCQueue &) = delete; + MPMCQueue &operator=(const MPMCQueue &) = delete; + + MPMCQueue(MPMCQueue &&other) noexcept + : m_capacity(other.m_capacity), + m_buffer(other.m_buffer), + m_head(other.m_head), + m_tail(other.m_tail) + { + other.m_capacity = 0; + other.m_buffer = nullptr; + } + MPMCQueue &operator=(MPMCQueue &&other) noexcept + { + if (this == &other) + return *this; + + m_capacity = other.m_capacity, other.m_capacity = 0; + m_buffer = other.m_buffer, other.m_buffer = nullptr; + m_head = other.m_head; + m_tail = other.m_tail; + + return *this; + } + + ~MPMCQueue() + { + for (size_t i = 0; i < m_capacity; ++i) + { + size_t seq = m_buffer[i].sequence.load(std::memory_order_relaxed); + if (seq <= m_tail) + { + T *ptr = reinterpret_cast(&m_buffer[i].storage); + ptr->~T(); + } + } + delete[] m_buffer; + } + + public: + bool push(const T &val) { return push_impl(val); } + bool push(T &&val) { return push_impl(std::move(val)); } + + std::optional pop() + { + Cell *cell; + size_t pos = m_head.load(std::memory_order_relaxed); + + while (true) + { + cell = &m_buffer[pos & mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)(pos + m_capacity); + + if (diff == 0) + { + if (m_head.compare_exchange_weak( + pos, + pos + 1, + std::memory_order_acq_rel, + std::memory_order_relaxed)) + break; + } + else if (diff < 0) + return std::nullopt; + else + pos = m_head.load(std::memory_order_relaxed); + } + + T *ptr = reinterpret_cast(&cell->storage); + T result = std::move(*ptr); + ptr->~T(); + + cell->sequence.store(pos + m_capacity, std::memory_order_release); + return result; + } + + public: + template + bool push_impl(U &&val) + { + Cell *cell; + size_t pos = m_tail.load(std::memory_order_relaxed); + + while (true) + { + cell = &m_buffer[pos & mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t diff = (intptr_t)seq - (intptr_t)pos; + + if (diff == 0) + { + if (m_tail.compare_exchange_weak( + pos, + pos + 1, + std::memory_order_acq_rel, + std::memory_order_relaxed)) + break; + } + else if (diff < 0) + return false; + else + pos = m_tail.load(std::memory_order_relaxed); + } + + try + { + new (&cell->storage) U(std::forward(val)); + cell->sequence.store(pos + m_capacity, std::memory_order_release); + } + catch (...) + { + cell->sequence.store(pos, std::memory_order_release); + throw; + } + + return true; + } + }; +} + +#endif \ No newline at end of file