Sygnalizacja pełnej / pustej kolejki o stałym rozmiarze w C ++

Rozważam aplikację, w której duże dane muszą być często zapisywane do pliku. Chciałbym użyć kolejka i mieć producenta i konsumenta działających w różnych wątkach. Dodatkowo chciałbym mieć kolejkę o stałym rozmiarze, ponieważ rozmiar danych może być bardzo duży. Zaimplementowałem prosty test funkcji boost :: lockfree :: queue z boost :: zmienna_warunkowa sygnalizująca stan kolejki. Chciałbym uniknąć muteksu, ale w przypadku wyjątku, w którym kolejka jest pełna (producent blokowy) lub pusta (klient blokowy)

Chciałbym wiedzieć (ryzykując opinię na podstawie opinii …), czy ja ” m prawidłowo używam warunków warunkowych lub występuje problem z wydajnością – w porównaniu z innymi metodami. Oto, co zrobiłem do tej pory (małe dane)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

To działa (wyjście):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Przewiduję, że w większości przypadków dane będą prawie zawsze dostępne, ale chcę je blokować w przypadku zatorów ( zapis plików, sieć itp.). Zainteresowanie stałym rozmiarem polega na zmartwieniu ogromnymi zbiorami danych i dynamiczną alokacją w kolejce –

(To bardziej eksperyment dotyczący tego, co można zrobić. W rzeczywistości moje dane są aktualizowane co najwyżej około 20 Hz więc samo zablokowanie kolejki std ::, której rozmiarem zarządzam, też będzie działać bardzo dobrze.)

Odpowiedź

Użyj semafora, aby spowodować uśpienie producentów, gdy kolejka jest pełna, oraz innego semafora, aby uśpić konsumentów, gdy kolejka jest pusta. gdy kolejka nie jest ani pełna, ani pusta, operacje sem_post i sem_wait są nieblokujące (w nowszych jądrach)

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Odpowiedź

Byłbym bardzo sceptyczny co do kodu zawierającego kontener bez blokady, dwa muteksy i zmienną warunkową do implementacji kolejki blokowania między wątkami. Nawet nie szukając dalej.

Prawdopodobnie zacząłbym od poniższego prototypu (może najpierw sprawdziłem, czy boost :: interprocess ma coś, czego mógłbym użyć od razu):

  1. wrap boost::circular_buffer do facebook/Folly/Synchronized, ale z niestandardową szafką, która try_lock(), a następnie obraca się jeszcze 41 razy z try_lock(), a następnie blokuje lock(), zliczając wystąpienia wszystkich trzech scenariuszy i z powiadomieniem / czekaniem na górze
  2. Wypuściłbym to do produkcji w trybie pilotażowym i sprawdziłbym, czy naprawdę muszę zawracać sobie głowę kontenerem bez blokady.

Komentarze

  • ' Nie rozumiem Twojego pseudokodu zbyt dobrze, ale rozumiem Twoje obawy. Sądziłem, że użycie scoped_lock sprawi, że to będzie w porządku i że każdy z nich wymaga ' mutexu do warunkowego oczekiwania.

Odpowiedź

Boost zapewnia kolejkę jednego producenta dla jednego konsumenta, która jest wolna od blokad Myślę, że wiesz o tym? Myślę, że dokładnie pasuje do Twojego przypadku użycia.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Możesz użyć stałego rozmiaru i zablokujesz konsumenta, jeśli nie będą dostępne dane itp. Bez implementacji go samodzielnie.

Komentarze

  • Tak zauważ to, ale wydawało się, że moja (nie podłączona do sieci) maszyna nie ma tego. To powiedziawszy, uważam, że pop i push działają tak samo, nie blokując, zwracając bool, niezależnie od tego, czy się powiedzie, czy nie.

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *