Jeg overvejer en applikation, hvor der ofte skal skrives store data til filen. Jeg vil gerne bruge en kø og have en producent og forbruger, der kører på forskellige tråde. Derudover vil jeg gerne have en kø i fast størrelse, da datastørrelsen kan være meget stor. Jeg har implementeret en simpel test af boost :: lockfree :: kø med boost :: condition_variable til at signalere køens tilstand. Jeg vil gerne undgå mutex, men med undtagelse af, hvor køen er fuld (blokproducent) eller tom (blok forbruger)
vil jeg gerne vide (med risiko for meningsbaseret ..) om jeg ” m bruger korrekt betingelser, eller hvis der er et præstationsproblem – sammenlignet med andre metoder. Her er hvad jeg har gjort indtil videre (små data)
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; }
Dette fungerer (output):
consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6
Jeg forventer, at data for det meste næsten altid vil være tilgængelige, men jeg vil blokere i tilfælde af overbelastning ( filskrivning, netværk osv.). Interessen for fast størrelse er bekymringen for massive datasæt og dynamisk fordeling i køen –
(Dette er mere et eksperiment med, hvad der kan gøres. I virkeligheden opdateres mine data højst omkring 20 Hz så bare at tage en lås på en std :: -kø, som jeg administrerer størrelsen på, fungerer også meget godt.)
Svar
Brug en semafor for at få producenterne til at sove, når køen er fuld, og en anden semafor for at få forbrugerne til at sove, når køen er tom. når køen hverken er fuld eller tom, blokeres sem_post og sem_wait-operationerne (i nyere kerner)
#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; };
Svar
Jeg ville være meget skeptisk over for koden, der indeholder en låsefri beholder, to mutexer og en betinget variabel til implementering af blokeringskø for interthread. Uden at kigge længere.
Jeg vil sandsynligvis starte fra prototypen nedenfor (måske først kontrollere, om boost :: interprocess har noget, som jeg kunne bruge med det samme):
- wrap
boost::circular_buffer
tilfacebook/Folly/Synchronized
men med brugerdefineret skab, der gørtry_lock()
, og derefter drejer 41 flere gange medtry_lock()
, blokerer derefter pålock()
, tæller forekomster af alle tre scenarier og med notify / wait ovenpå - Jeg ville frigive det til produktion i pilottilstand og kontrollere, om jeg virkelig har brug for at gider med en låsefri container.
Kommentarer
- Jeg ‘ følger ikke din pseudokode meget godt, men jeg forstår de bekymringer, du har. Jeg havde regnet med, at brug af scoped_lock ville gøre dette ok, og at hver kræver det ‘ s mutex for den betingede ventetid.
Svar
Boost giver en enkeltproducent-kø for én forbruger, der er låsefri, tror jeg, ved du noget om det? Jeg synes, det passer nøjagtigt til din brugssag.
http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html
Du kan bruge fast størrelse, og du vil have blokeret forbruger, hvis ikke data er tilgængelige osv. Uden at implementere det selv.
Kommentarer
- Jeg gjorde bemærk dette, men min (ikke netværksmæssige) maskine syntes at mangle dette. Når det er sagt, tror jeg, at pop og push fungerer det samme, ikke-blokerende tilbagevendende bool, uanset om det lykkes eller ej.