スレッドを持ち、ブーストリングバッファを使用しているプロデューサーコンシューマー

2つのスレッドがあり、1つはプロデューサーで、もう1つはコンシューマーです。私のコンシューマーは常に遅れています(いくつかのコストのかかる関数呼び出しのため、スリープを使用して以下のコードでシミュレートされています)。そのため、いくつかのイベントを失う余裕があるため、リングバッファーを使用しました。

ロックが正常に見えるかどうかを確認し、一般的なC ++レビューコメントを探しています。

#include <iostream> #include <thread> #include <chrono> #include <vector> #include <atomic> #include <boost/circular_buffer.hpp> #include <condition_variable> #include <functional> std::atomic<bool> mRunning; std::mutex m_mutex; std::condition_variable m_condVar; class VecBuf { private: std::vector<int8_t> vec; public: VecBuf() = default; VecBuf(std::vector<int8_t> v) { vec = v; } }; std::vector<int8_t> data{ 10, 20, 30 }; class Detacher { public: template<typename Function, typename ... Args> void createTask(Function &&func, Args&& ... args) { m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...); } Detacher() = default; Detacher(const Detacher&) = delete; Detacher & operator=(const Detacher&) = delete; Detacher(Detacher&&) = default; Detacher& operator=(Detacher&&) = default; ~Detacher() { for (auto& thread : m_threads) { thread.join(); } } private: std::vector<std::thread> m_threads; }; void foo_1(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, [=]() { return !cb->empty(); }); VecBuf local_data(cb->front()); cb->pop_front(); mlock.unlock(); if (!mRunning) { break; } //simulate time consuming function call and consume local_data here std::this_thread::sleep_for(std::chrono::milliseconds(16)); } while (cb->size()) { VecBuf local_data(cb->front()); cb->pop_front(); if (!mRunning) { break; } } } void foo_2(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); while (cb->full()) { mlock.unlock(); /* can we do better than this? */ std::this_thread::sleep_for(std::chrono::milliseconds(100)); mlock.lock(); } cb->push_back(VecBuf(data)); m_condVar.notify_one(); } } int main() { mRunning = true; boost::circular_buffer<VecBuf> cb(100); Detacher thread_1; thread_1.createTask(foo_1, &cb); Detacher thread_2; thread_2.createTask(foo_2, &cb); std::this_thread::sleep_for(std::chrono::milliseconds(20000)); mRunning = false; } 

回答

 /* can we do better than this? */ 

循環バッファーのコンテキストでは、ビジー待機を回避する標準的な方法は、2つのセマフォを使用することです。最初にバッファーがいっぱいになったときにプロデューサーをブロックし、次にバッファーが空になったときにコンシューマーをブロックします。プロセスがセマフォを通過してジョブを実行すると、ピアに信号を送る必要があります。


循環バッファは、コンシューマがたまに遅れる場合に適しています データを失う余裕があります。あなたの状況では、それは間違った解決策のように見えます。プロデューサーは消費率によって抑制され、コンシューマーには古いデータが表示されます。

一般的な答えは、プロデューサーをフルスピードで実行させることです。 、および生成をトリプルバッファリングします(少なくとも、消費者が最後に生成されたデータを取得することを保証します)。恥知らずな自己宣伝をご容赦ください。

コメント

  • レビューありがとうございますコメントとリンク。しかし、私の場合、私の消費者はしばらくの間しかブロックしません。条件変数+ロックを使用する現在のソリューションの代わりにセマフォを使用することはまだ理にかなっていると思います。
  • @nomanpouigt "しばらくの間"は"常に遅い"とはかなり異なります。
  • 申し訳ありませんが、リングバッファを意味しますサイズは、遅延に対応できるように選択されます。 C ++にはセマフォがなく、ミューテックスと条件変数を使用して実装されているので、セマフォを使用するのが理にかなっている場合は疑問に思います。
  • トリプルバッファのソリューションと、私の場合はどのように適用されるかについても詳しく説明してください。 ? 'プロデューサー側でロックを行わずに実行させ、オーバーランが発生した場合にコンシューマーが最新のデータを取得できるように、プロデューサーバッファーをトリプルバッファーにする必要があるということですか?

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です