大きなデータを頻繁にファイルに書き込む必要があるアプリケーションを検討しています。使用したいキューがあり、プロデューサーとコンシューマーが異なるスレッドで実行されています。さらに、データサイズが非常に大きくなる可能性があるため、固定サイズのキューが必要です。boost:: lockfree :: queueとboostの簡単なテストを実装しました。 :: condition_variableは、キューの状態を通知します。ミューテックスを避けたいのですが、キューがいっぱい(ブロックプロデューサー)または空(ブロックコンシューマー)の場合を除いて
(意見に基づくリスクがある)私が知りたいのですが」 m条件を適切に使用するか、パフォーマンスの問題がある場合-他の方法を使用する場合と比較します。これまでに行ったことは次のとおりです(小さなデータ)
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; }
これは機能します(出力):
consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6
ほとんどの場合、データは常に利用可能であると思いますが、混雑した場合はブロックしたいと思います(ファイル書き込み、ネットワークなど)。固定サイズへの関心は、大量のデータセットとキュー内の動的割り当ての心配です-
(これは何ができるかについての実験です。実際には、私のデータは最大で約20Hzで更新されます。したがって、サイズを管理するstd :: queueをロックするだけでも、非常にうまく機能します。)
回答
セマフォを使用して、キューがいっぱいになったときにプロデューサーをスリープさせ、別のセマフォを使用して、キューが空になったときにコンシューマーをスリープさせます。キューがいっぱいでも空でもない場合、sem_postおよびsem_wait操作は非ブロッキングです(新しいカーネルの場合)
#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; };
回答
ロックフリーコンテナ、2つのミューテックス、およびスレッド間ブロッキングキューを実装するための条件変数を含むコードについては非常に懐疑的です。さらに詳しく調べることなく。
おそらく以下のプロトタイプから始めます(おそらく、boost :: interprocessにすぐに使用できるものがあるかどうかを最初に確認します):
- wrap
boost::circular_buffer
をfacebook/Folly/Synchronized
に追加しますが、try_lock()
を実行するカスタムロッカーを使用すると、さらに41回スピンします。try_lock()
次に、lock()
でブロックし、3つのシナリオすべての発生をカウントし、通知/待機を最上位にします - それをパイロットモードで本番環境にリリースし、ロックフリーコンテナを本当に気にする必要があるかどうかを確認します。
コメント
- 私は'疑似コードにうまく従っていませんが、あなたの懸念は理解しています。 scoped_lockを使用するとこれで問題がなく、それぞれが条件付き待機のために'ミューテックスを必要とすることを私は考えていました。
回答
Boostは、ロックフリーの単一プロデューサー単一消費者キューを提供します。それについて知っていますか?ユースケースにぴったりだと思います。
http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html
固定サイズを使用でき、データが利用できない場合など、自分で実装せずにコンシューマーをブロックすることになります。
コメント
- 実行しましたこれに注意してください、しかし私の(ネットワーク化されていない)マシンはこれを見逃しているようでした。とはいえ、ポップとプッシュは同じように機能し、成功したかどうかに関係なく、ブール値をブロックせずに返すと思います。