Ho due thread, uno è il produttore e laltro è il consumatore. Il mio consumatore è sempre in ritardo (a causa di qualche chiamata di funzione costosa, simulata nel codice sottostante usando sleeps) quindi ho usato il ring buffer poiché posso permettermi di perdere alcuni eventi.
Sto cercando di vedere se il mio blocco sembra a posto e commenti generali di revisione in c ++.
#include <iostream> #include <thread> #include <chrono> #include <vector> #include <atomic> #include <boost/circular_buffer.hpp> #include <condition_variable> #include <functional> std::atomic<bool> mRunning; std::mutex m_mutex; std::condition_variable m_condVar; class VecBuf { private: std::vector<int8_t> vec; public: VecBuf() = default; VecBuf(std::vector<int8_t> v) { vec = v; } }; std::vector<int8_t> data{ 10, 20, 30 }; class Detacher { public: template<typename Function, typename ... Args> void createTask(Function &&func, Args&& ... args) { m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...); } Detacher() = default; Detacher(const Detacher&) = delete; Detacher & operator=(const Detacher&) = delete; Detacher(Detacher&&) = default; Detacher& operator=(Detacher&&) = default; ~Detacher() { for (auto& thread : m_threads) { thread.join(); } } private: std::vector<std::thread> m_threads; }; void foo_1(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, [=]() { return !cb->empty(); }); VecBuf local_data(cb->front()); cb->pop_front(); mlock.unlock(); if (!mRunning) { break; } //simulate time consuming function call and consume local_data here std::this_thread::sleep_for(std::chrono::milliseconds(16)); } while (cb->size()) { VecBuf local_data(cb->front()); cb->pop_front(); if (!mRunning) { break; } } } void foo_2(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); while (cb->full()) { mlock.unlock(); /* can we do better than this? */ std::this_thread::sleep_for(std::chrono::milliseconds(100)); mlock.lock(); } cb->push_back(VecBuf(data)); m_condVar.notify_one(); } } int main() { mRunning = true; boost::circular_buffer<VecBuf> cb(100); Detacher thread_1; thread_1.createTask(foo_1, &cb); Detacher thread_2; thread_2.createTask(foo_2, &cb); std::this_thread::sleep_for(std::chrono::milliseconds(20000)); mRunning = false; }
Risposta
/* can we do better than this? */
In un contesto di buffer circolare, il modo standard per evitare unattesa impegnata è di avere due semafori. Il primo a bloccare un produttore quando un buffer è pieno e il secondo a bloccare un consumatore quando il buffer è vuoto. Una volta che un processo supera il suo semaforo e fa il suo lavoro, dovrebbe segnalare il peer.
Il buffer circolare è buono quando il consumatore è solo a volte in ritardo e non puoi permettersi di perdere dati. Nella tua situazione sembra una soluzione sbagliata: il produttore viene strozzato dal tasso di consumo e al consumatore vengono presentati dati obsoleti.
Una risposta tipica è lasciare che il produttore funzioni a piena velocità e triplicare il buffer della produzione (almeno, garantisce che il consumatore ottenga i dati prodotti più di recente). Per favore perdona la spudorata autopromozione .
Commenti
- grazie per la recensione commento e il collegamento. Tuttavia, nel mio caso il mio consumatore blocca solo un po di tempo. Penso che abbia ancora senso utilizzare i semafori invece della mia attuale soluzione utilizzando la variabile di condizione + lock.
- @nomanpouigt " qualche volta " è abbastanza diverso da " sempre in ritardo ".
- scusa, intendo il ring buffer la dimensione è scelta in modo tale da poter adattare il ritardo. Mi chiedo che in c ++ non ci siano semafori ed è implementato usando mutex e variabile di condizione, quindi se ha senso usare i semafori.
- Puoi anche elaborare la tua soluzione di triplo buffer e come si applicherebbe nel mio caso ? Vuoi dire che ' non effettua alcun blocco dal lato produttore e lascialo funzionare e il buffer del produttore deve essere triplicato in modo che il consumatore possa ottenere dati recenti in caso di sovraccarico?