스레드가 있고 부스트 링 버퍼를 사용하는 생산자 소비자

두 개의 스레드가 있는데 하나는 생산자이고 다른 하나는 소비자입니다. 내 소비자는 항상 늦었 기 때문에 (수면을 사용하여 아래 코드에서 시뮬레이션 된 비용이 많이 드는 함수 호출로 인해) 일부 이벤트를 느슨하게 할 수 있으므로 링 버퍼를 사용했습니다.

잠금이 괜찮아 보이는지 확인하고 일반적인 C ++ 검토 주석을 찾고 있습니다.

#include <iostream> #include <thread> #include <chrono> #include <vector> #include <atomic> #include <boost/circular_buffer.hpp> #include <condition_variable> #include <functional> std::atomic<bool> mRunning; std::mutex m_mutex; std::condition_variable m_condVar; class VecBuf { private: std::vector<int8_t> vec; public: VecBuf() = default; VecBuf(std::vector<int8_t> v) { vec = v; } }; std::vector<int8_t> data{ 10, 20, 30 }; class Detacher { public: template<typename Function, typename ... Args> void createTask(Function &&func, Args&& ... args) { m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...); } Detacher() = default; Detacher(const Detacher&) = delete; Detacher & operator=(const Detacher&) = delete; Detacher(Detacher&&) = default; Detacher& operator=(Detacher&&) = default; ~Detacher() { for (auto& thread : m_threads) { thread.join(); } } private: std::vector<std::thread> m_threads; }; void foo_1(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, [=]() { return !cb->empty(); }); VecBuf local_data(cb->front()); cb->pop_front(); mlock.unlock(); if (!mRunning) { break; } //simulate time consuming function call and consume local_data here std::this_thread::sleep_for(std::chrono::milliseconds(16)); } while (cb->size()) { VecBuf local_data(cb->front()); cb->pop_front(); if (!mRunning) { break; } } } void foo_2(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); while (cb->full()) { mlock.unlock(); /* can we do better than this? */ std::this_thread::sleep_for(std::chrono::milliseconds(100)); mlock.lock(); } cb->push_back(VecBuf(data)); m_condVar.notify_one(); } } int main() { mRunning = true; boost::circular_buffer<VecBuf> cb(100); Detacher thread_1; thread_1.createTask(foo_1, &cb); Detacher thread_2; thread_2.createTask(foo_2, &cb); std::this_thread::sleep_for(std::chrono::milliseconds(20000)); mRunning = false; } 

답변

 /* can we do better than this? */ 

순환 버퍼 컨텍스트에서 바쁜 대기를 방지하는 표준 방법은 두 개의 세마포를 갖는 것입니다. 첫 번째는 버퍼가 꽉 찼을 때 생산자를 차단하고 두 번째는 버퍼가 비어있을 때 소비자를 차단하는 것입니다. 프로세스가 세마포어를 통과하고 작업을 수행하면 피어에 신호를 보내야합니다.


소비자가 가끔 늦을 때 순환 버퍼가 좋습니다. 잃어버린 데이터를 감당할 수 있습니다. 귀하의 상황에서는 잘못된 해결책처럼 보입니다. 생산자는 소비율에 의해 제한되고 소비자는 오래된 데이터를 제공받습니다.

일반적인 대답은 생산자가 최고 속도로 실행되도록하는 것입니다. , 생산을 삼중 버퍼링합니다 (적어도 소비자가 가장 최근에 생산 된 데이터를 얻을 수 있음을 보장합니다). 뻔뻔한 자체 홍보 를 용서해주세요.

댓글

  • 검토해 주셔서 감사합니다. 댓글과 링크. 그러나 제 경우에는 소비자가 얼마 동안 만 차단합니다. 조건 변수 + 잠금을 사용하여 현재 솔루션 대신 세마포어를 사용하는 것이 여전히 타당하다고 생각합니다.
  • @nomanpouigt " 가끔 "는 " 항상 늦습니다 "와 상당히 다릅니다.
  • 죄송합니다. 링 버퍼를 의미합니다. 크기는 지연을 수용 할 수있는 방식으로 선택됩니다. C ++에서는 세마포어가 없으며 뮤텍스와 조건 변수를 사용하여 구현되었으므로 세마포어를 사용하는 것이 합리적입니다.
  • 또한 삼중 버퍼 솔루션을 자세히 설명해 주시고 제 경우에는 어떻게 적용 할 수 있습니까? ? ' 생산자 측에서 잠금을 수행하지 않고 실행하도록 허용하고 생산자 버퍼를 삼중 버퍼링해야 소비자가 오버런시 최신 데이터를 얻을 수 있다는 의미입니까?

li>

답글 남기기

이메일 주소를 발행하지 않을 것입니다. 필수 항목은 *(으)로 표시합니다