Mam dwa wątki, jeden jest producentem, a drugi konsumentem. Mój konsument zawsze się spóźnia (z powodu kosztownego wywołania funkcji, symulowanego w poniższym kodzie przy użyciu uśpienia), więc użyłem bufora pierścieniowego, ponieważ mogę sobie pozwolić na utratę niektórych zdarzeń.
Szukam, aby sprawdzić, czy moje blokowanie wygląda dobrze i ogólne komentarze z przeglądu C ++.
#include <iostream> #include <thread> #include <chrono> #include <vector> #include <atomic> #include <boost/circular_buffer.hpp> #include <condition_variable> #include <functional> std::atomic<bool> mRunning; std::mutex m_mutex; std::condition_variable m_condVar; class VecBuf { private: std::vector<int8_t> vec; public: VecBuf() = default; VecBuf(std::vector<int8_t> v) { vec = v; } }; std::vector<int8_t> data{ 10, 20, 30 }; class Detacher { public: template<typename Function, typename ... Args> void createTask(Function &&func, Args&& ... args) { m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...); } Detacher() = default; Detacher(const Detacher&) = delete; Detacher & operator=(const Detacher&) = delete; Detacher(Detacher&&) = default; Detacher& operator=(Detacher&&) = default; ~Detacher() { for (auto& thread : m_threads) { thread.join(); } } private: std::vector<std::thread> m_threads; }; void foo_1(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, [=]() { return !cb->empty(); }); VecBuf local_data(cb->front()); cb->pop_front(); mlock.unlock(); if (!mRunning) { break; } //simulate time consuming function call and consume local_data here std::this_thread::sleep_for(std::chrono::milliseconds(16)); } while (cb->size()) { VecBuf local_data(cb->front()); cb->pop_front(); if (!mRunning) { break; } } } void foo_2(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); while (cb->full()) { mlock.unlock(); /* can we do better than this? */ std::this_thread::sleep_for(std::chrono::milliseconds(100)); mlock.lock(); } cb->push_back(VecBuf(data)); m_condVar.notify_one(); } } int main() { mRunning = true; boost::circular_buffer<VecBuf> cb(100); Detacher thread_1; thread_1.createTask(foo_1, &cb); Detacher thread_2; thread_2.createTask(foo_2, &cb); std::this_thread::sleep_for(std::chrono::milliseconds(20000)); mRunning = false; }
Odpowiedź
/* can we do better than this? */
W kontekście bufora cyklicznego, standardowym sposobem uniknięcia zajętego czekania jest posiadanie dwóch semaforów. Po pierwsze, aby zablokować producenta, gdy bufor jest pełny, a po drugie, aby zablokować konsumenta, gdy bufor jest pusty. Gdy proces przejdzie przez swój semafor i wykona swoją pracę, powinien zasygnalizować peerowi.
Bufor cykliczny jest dobry, gdy konsument tylko czasami się spóźnia i nie można pozwolić sobie na utratę danych. W Twojej sytuacji wygląda to na złe rozwiązanie: producent zostaje zdławiony przez tempo zużycia, a konsumentowi są przedstawione nieaktualne dane.
Typową odpowiedzią jest pozwolenie producentowi na pracę z pełną prędkością i potrójne buforowanie produkcji (przynajmniej gwarantuje to, że konsument otrzyma najnowsze dane). Proszę wybaczyć bezwstydną autopromocję .
Komentarze
- dziękuję za recenzję komentarz i link. Jednak w moim przypadku konsument blokuje się tylko na jakiś czas. Myślę, że nadal ma sens używanie semaforów zamiast mojego obecnego rozwiązania przy użyciu zmiennej warunku + blokada.
- @nomanpouigt " jakiś czas " jest zupełnie inny niż " zawsze spóźniony ".
- przepraszam, mam na myśli bufor pierścienia rozmiar dobiera się w taki sposób, aby był w stanie dostosować się do opóźnienia. Zastanawiam się, że w c ++ nie ma semaforów i jest zaimplementowana przy użyciu muteksów i zmiennej warunkowej, więc jeśli ma sens używanie semaforów.
- Czy możesz również opracować swoje rozwiązanie potrójnego bufora i jak by to miało zastosowanie w moim przypadku ? Czy masz na myśli, że ' nie blokuj po stronie producenta i pozwól mu działać, a bufor producenta musi być buforowany potrójnie, aby konsument mógł uzyskać najnowsze dane w przypadku przekroczenia?