C ++ vaste grootte wachtrij vol / leeg signalering

Ik denk aan een toepassing waar vaak grote gegevens naar een bestand moeten worden geschreven. Ik zou graag willen gebruiken een wachtrij en laat een producent en een consument op verschillende threads draaien. Bovendien zou ik graag een wachtrij met een vaste grootte willen hebben, aangezien de gegevensgrootte erg groot kan zijn. Ik heb een eenvoudige test geïmplementeerd van boost :: lockfree :: queue met boost :: condition_variable om de status van de wachtrij aan te geven. Ik zou de mutex willen vermijden, maar voor de uitzondering waar de wachtrij vol is (blokproducent) of leeg (blokgebruiker).

Ik zou graag willen weten (met risico op opinie gebaseerd ..) of ik ” m correct gebruik van conditionals of als er een prestatieprobleem is – vergeleken met het gebruik van andere methoden. Dit is wat ik tot dusver heb gedaan (kleine gegevens)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

Dit werkt (output):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Ik verwacht dat gegevens voor het grootste deel bijna altijd beschikbaar zullen zijn, maar ik wil blokkeren in geval van congestie ( bestand schrijven, netwerk, enz.). Het belang van een vaste grootte is de zorg van enorme datasets en dynamische toewijzing in de wachtrij –

(Dit is meer een experiment in wat er kan worden gedaan. In werkelijkheid worden mijn gegevens maximaal ongeveer 20 Hz bijgewerkt dus het vergrendelen van een std :: queue waarvan ik de grootte beheer, zal ook heel goed werken.)

Answer

Gebruik een seinpaal om de producers te laten slapen als de wachtrij vol is, en een andere seinpaal om de consumenten te laten slapen als de wachtrij leeg is. wanneer de wachtrij niet vol of leeg is, blokkeren de sem_post en sem_wait-bewerkingen niet (in nieuwere kernels)

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Answer

Ik zou erg sceptisch zijn over de code die een vergrendelingsvrije container, twee mutexen en een voorwaardelijke variabele bevat om de wachtrij voor het blokkeren van interthreads te implementeren. Zonder zelfs maar verder te kijken.

Ik zou waarschijnlijk beginnen met het onderstaande prototype (misschien eerst controleren of boost :: interprocess iets heeft dat ik meteen zou kunnen gebruiken):

  1. wrap boost::circular_buffer in facebook/Folly/Synchronized maar met een aangepaste locker die try_lock() doet, en daarna nog 41 keer draait met try_lock(), en blokkeert vervolgens op lock(), waarbij alle drie de scenarios worden geteld, en met notificatie / wachten bovenaan
  2. Ik zou dat vrijgeven voor productie in pilot-modus en controleren of ik echt moeite moet doen met een container zonder slot.

Reacties

  • Ik ‘ volg uw pseudocode niet zo goed, maar ik begrijp uw zorgen. Ik had gerekend dat het gebruik van scoped_lock dit ok zou maken, en dat elk het ‘ s mutex vereist voor het voorwaardelijke wachten.

Antwoord

Boost biedt een single-producer single-consumer wachtrij die lock-free is, denk ik, weet je ervan? Ik denk dat het precies in uw gebruikssituatie past.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Je kunt een vaste grootte gebruiken en je hebt de consument geblokkeerd als er geen gegevens beschikbaar zijn, enz. Zonder het zelf te implementeren.

Reacties

  • Ik deed merk dit op maar mijn (niet genetwerkte) machine leek dit te missen. Dat gezegd hebbende, denk ik dat pop en push hetzelfde werken, niet-blokkerende terugkerende bool, of ze nu succesvol zijn of niet.

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *