C ++ – Warteschlange mit fester Größe und voller / leerer Signalisierung

Ich erwäge eine Anwendung, bei der große Datenmengen häufig in eine Datei geschrieben werden müssen. Ich möchte sie verwenden eine Warteschlange und einen Produzenten und Konsumenten, die auf verschiedenen Threads ausgeführt werden. Außerdem möchte ich eine Warteschlange mit fester Größe haben, da die Datengröße sehr groß sein kann. Ich habe einen einfachen Test von boost :: lockfree :: queue mit boost implementiert :: condition_variable, um den Status der Warteschlange anzuzeigen. Ich möchte den Mutex vermeiden, aber für die Ausnahme, dass die Warteschlange voll (Blockproduzent) oder leer (Blockkonsument) ist

Ich würde gerne wissen (auf der Grundlage von Meinungsrisiken ..), ob ich “ Ich verwende Bedingungen richtig oder wenn es ein Leistungsproblem gibt – im Vergleich zu anderen Methoden. Folgendes habe ich bisher getan (kleine Daten)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

Dies funktioniert (Ausgabe):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Ich gehe davon aus, dass zum größten Teil Daten fast immer verfügbar sind, aber ich möchte sie im Falle einer Überlastung blockieren ( Dateischreiben, Netzwerk usw.). Das Interesse an fester Größe ist die Sorge um massive Datensätze und dynamische Zuordnung in der Warteschlange –

(Dies ist eher ein Experiment darüber, was getan werden kann. In Wirklichkeit werden meine Daten höchstens um 20 Hz aktualisiert Wenn Sie also nur eine std :: -Warteschlange sperren, deren Größe ich verwalte, funktioniert dies ebenfalls sehr gut.)

Antwort

Verwenden Sie ein Semaphor, um die Produzenten in den Ruhezustand zu versetzen, wenn die Warteschlange voll ist, und ein anderes Semaphor, um die Konsumenten in den Ruhezustand zu versetzen, wenn die Warteschlange leer ist. Wenn die Warteschlange weder voll noch leer ist, blockieren die Operationen sem_post und sem_wait nicht (in neueren Kerneln).

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Antwort

Ich wäre sehr skeptisch gegenüber dem Code, der einen sperrfreien Container, zwei Mutexe und eine bedingte Variable zum Implementieren der Interthread-Blockierungswarteschlange enthält. Ohne weiter zu schauen.

Ich würde wahrscheinlich mit dem folgenden Prototyp beginnen (vielleicht zuerst prüfen, ob boost :: interprocess etwas enthält, das ich sofort verwenden könnte):

  1. wrap boost::circular_buffer in facebook/Folly/Synchronized, aber mit einem benutzerdefinierten Schließfach, das try_lock() ausführt, dreht sich dann 41 weitere Male mit try_lock(), blockiert dann lock(), zählt die Vorkommen aller drei Szenarien und mit Benachrichtigung / Warten oben
  2. Ich würde das im Pilotmodus für die Produktion freigeben und prüfen, ob ich mich wirklich um einen sperrfreien Container kümmern muss.

Kommentare

  • Ich ‚ folge Ihrem Pseudocode nicht sehr gut, aber ich verstehe die Bedenken, die Sie haben. Ich hatte damit gerechnet, dass die Verwendung von scoped_lock dies in Ordnung bringen würde und dass jeder den Mutex ‚ für das bedingte Warten benötigt.

Antwort

Boost bietet eine Single-Producer-Single-Consumer-Warteschlange, die sperrfrei ist. Ich denke, wissen Sie davon? Ich denke, es passt genau zu Ihrem Anwendungsfall.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Sie können eine feste Größe verwenden und haben den Verbraucher blockiert, wenn keine Daten verfügbar sind usw., ohne sie selbst zu implementieren.

Kommentare

  • Ich habe es getan Beachten Sie dies, aber meinem (nicht vernetzten) Computer schien dies zu fehlen. Trotzdem glaube ich, dass Pop und Push gleich funktionieren und den zurückkehrenden Bool nicht blockieren, egal ob erfolgreich oder nicht.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.