Sto considerando unapplicazione in cui è necessario scrivere spesso su file dati di grandi dimensioni. Vorrei utilizzare una coda e avere un produttore e un consumatore in esecuzione su thread diversi. Inoltre, vorrei avere una coda di dimensioni fisse poiché la dimensione dei dati può essere molto grande. Ho implementato un semplice test di boost :: lockfree :: queue con boost :: condition_variable per segnalare lo stato della coda. Vorrei evitare il mutex ma per leccezione in cui la coda è piena (block producer) o vuota (block consumer)
vorrei sapere (a rischio basato sullopinione ..) se io ” sto usando correttamente i condizionali o se cè un problema di prestazioni rispetto alluso di altri metodi. Ecco cosa ho fatto finora (piccoli dati)
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; }
Funziona (output):
consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6
Prevedo che per la maggior parte i dati saranno quasi sempre disponibili ma voglio bloccarli in caso di congestione ( scrittura di file, rete, ecc.). Linteresse per le dimensioni fisse è la preoccupazione di enormi set di dati e allocazione dinamica nella coda –
(Questo è più un esperimento su ciò che può essere fatto. In realtà, i miei dati vengono aggiornati al massimo a circa 20 Hz quindi anche solo bloccare una std :: queue di cui gestisco la dimensione funzionerà molto bene.)
Answer
Usa un semaforo per far dormire i produttori quando la coda è piena e un altro semaforo per far dormire i consumatori quando la coda è vuota. quando la coda non è né piena né vuota, le operazioni sem_post e sem_wait sono non bloccanti (nei kernel più recenti)
#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; };
Risposta
Sarei molto scettico riguardo al codice contenente un contenitore lockfree, due mutex e una variabile condizionale per implementare la coda di blocco interthread. Senza nemmeno guardare oltre.
Probabilmente inizierei dal prototipo di seguito (magari prima controllando se boost :: interprocess ha qualcosa che potrei usare subito):
- wrap
boost::circular_buffer
infacebook/Folly/Synchronized
ma con un armadietto personalizzato che eseguetry_lock()
, quindi gira altre 41 volte contry_lock()
, quindi bloccalock()
, contando le occorrenze di tutti e tre gli scenari e con notifica / attesa in alto - Lo rilascerei in produzione in modalità pilota e verificherei se ho davvero bisogno di preoccuparmi di un contenitore senza blocchi.
Commenti
- ‘ non seguo molto bene il tuo pseudo codice ma capisco le tue preoccupazioni. Avevo pensato che luso di scoped_lock avrebbe reso tutto ok e che ciascuno lo richiedesse ‘ s mutex per lattesa condizionale.
Risposta
Boost fornisce una coda a singolo produttore per consumatore singolo che è senza blocchi, penso, lo sai? Penso che si adatti esattamente al tuo caso duso.
http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html
Puoi utilizzare dimensioni fisse e avrai bloccato il consumatore se non i dati disponibili, ecc. Senza implementarlo da solo.
Commenti
- Lho fatto notare questo ma la mia macchina (non collegata in rete) sembrava mancare di questo. Detto questo, credo che pop e push funzionino allo stesso modo, senza bloccare la restituzione di bool indipendentemente dal fatto che abbia successo o meno.