C ++ dimensiune fixă coadă semnalizare completă / goală

Mă gândesc la o aplicație în care trebuie să fie scrise date mari pentru a înregistra deseori. Aș dori să folosesc o coadă și un producător și un consumator rulează pe fire diferite. În plus, aș dori să am o coadă de dimensiuni fixe, deoarece dimensiunea datelor poate fi foarte mare. Am implementat un test simplu de boost :: lockfree :: coadă cu boost :: condition_variable pentru a semnaliza starea cozii. Aș dori să evit mutex-ul, dar pentru excepția în care coada este plină (bloc producător) sau goală (bloc consumator)

Aș dori să știu (cu riscul de opinie bazat pe ..) dacă ” Folosesc condiționalele în mod corespunzător sau dacă există o problemă de performanță – în comparație cu utilizarea altor metode. Iată ce am făcut până acum (date mici)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

Acest lucru funcționează (ieșire):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Anticipez că, în cea mai mare parte, datele vor fi aproape întotdeauna disponibile, dar vreau să le blochez în caz de congestie ( scriere de fișiere, rețea etc.). Interesul pentru dimensiunea fixă este îngrijorarea seturilor masive de date și a alocării dinamice în coadă –

(Acesta este mai mult un experiment în ceea ce se poate face. În realitate, datele mele sunt actualizate cel mult cu aproximativ 20 Hz deci, doar luând o blocare pe o coadă std :: de care gestionez dimensiunea va funcționa foarte bine.)

Răspunde

Utilizați un semafor pentru a determina producătorii să doarmă când coada este plină și un alt semafor pentru a determina consumatorii să doarmă când coada este goală. când coada nu este nici completă, nici goală, operațiile sem_post și sem_wait nu sunt de blocare (în nuclee mai noi)

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Răspuns

Aș fi foarte sceptic cu privire la codul care conține un container fără blocare, două mutexe și o variabilă condițională pentru a implementa coada de blocare interthread. Fără să mă uit mai departe.

Aș începe probabil de la prototipul de mai jos (poate verific mai întâi dacă boost :: interprocess are ceva ce aș putea folosi imediat):

  1. wrap boost::circular_buffer în facebook/Folly/Synchronized, dar cu dulap personalizat care face try_lock(), apoi se rotește de 41 de ori mai mult cu try_lock(), apoi blochează lock(), numărând aparițiile tuturor celor trei scenarii și cu notificare / așteptare în partea de sus
  2. Aș lansa acest lucru în producție în modul pilot și aș verifica dacă chiar trebuie să mă deranjez cu un container fără blocare.

Comentarii

  • Nu ‘ nu îți urmez pseudo codul prea bine, dar înțeleg preocupările pe care le ai. Considerasem că utilizarea scoped_lock ar face acest lucru ok și că fiecare necesită ‘ s mutex pentru așteptarea condiționată.

Răspuns

Boost oferă o coadă pentru un singur producător, care nu este blocată, cred, știți despre asta? Cred că se potrivește exact cazului dvs. de utilizare.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Puteți utiliza dimensiunea fixă și veți fi blocat consumatorul dacă nu sunt date disponibile etc. fără a le implementa singur.

Comentarii

  • Am făcut-o observați acest lucru, dar mașina mea (care nu este conectată în rețea) părea să lipsească. Acestea fiind spuse, cred că pop-ul și push-ul funcționează la fel, fără blocarea întoarcerii boolului, indiferent dacă are succes sau nu.

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *