Productor consumidor con hilos y usando boost ring buffer

Tengo dos hilos, uno es el productor y el otro es el consumidor. Mi consumidor siempre llega tarde (debido a una llamada de función costosa, simulada en el siguiente código usando sleeps), así que he usado el búfer de anillo ya que puedo permitirme perder algunos eventos.

Estoy mirando para ver si mi bloqueo se ve bien y comentarios generales de revisión de C ++.

#include <iostream> #include <thread> #include <chrono> #include <vector> #include <atomic> #include <boost/circular_buffer.hpp> #include <condition_variable> #include <functional> std::atomic<bool> mRunning; std::mutex m_mutex; std::condition_variable m_condVar; class VecBuf { private: std::vector<int8_t> vec; public: VecBuf() = default; VecBuf(std::vector<int8_t> v) { vec = v; } }; std::vector<int8_t> data{ 10, 20, 30 }; class Detacher { public: template<typename Function, typename ... Args> void createTask(Function &&func, Args&& ... args) { m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...); } Detacher() = default; Detacher(const Detacher&) = delete; Detacher & operator=(const Detacher&) = delete; Detacher(Detacher&&) = default; Detacher& operator=(Detacher&&) = default; ~Detacher() { for (auto& thread : m_threads) { thread.join(); } } private: std::vector<std::thread> m_threads; }; void foo_1(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, [=]() { return !cb->empty(); }); VecBuf local_data(cb->front()); cb->pop_front(); mlock.unlock(); if (!mRunning) { break; } //simulate time consuming function call and consume local_data here std::this_thread::sleep_for(std::chrono::milliseconds(16)); } while (cb->size()) { VecBuf local_data(cb->front()); cb->pop_front(); if (!mRunning) { break; } } } void foo_2(boost::circular_buffer<VecBuf> *cb) { while (mRunning) { std::unique_lock<std::mutex> mlock(m_mutex); while (cb->full()) { mlock.unlock(); /* can we do better than this? */ std::this_thread::sleep_for(std::chrono::milliseconds(100)); mlock.lock(); } cb->push_back(VecBuf(data)); m_condVar.notify_one(); } } int main() { mRunning = true; boost::circular_buffer<VecBuf> cb(100); Detacher thread_1; thread_1.createTask(foo_1, &cb); Detacher thread_2; thread_2.createTask(foo_2, &cb); std::this_thread::sleep_for(std::chrono::milliseconds(20000)); mRunning = false; } 

Responder

 /* can we do better than this? */ 

En un contexto de búfer circular, la forma estándar de evitar la espera ocupada es tener dos semáforos. Primero para bloquear a un productor cuando un búfer está lleno y segundo para bloquear a un consumidor cuando el búfer está vacío. Una vez que un proceso pasa su semáforo y hace su trabajo, debería señalar al par.


El búfer circular es bueno cuando el consumidor solo a veces llega tarde y usted no puede permitirse perder datos. En su situación, parece una solución incorrecta: el productor se ve estrangulado por la tasa de consumo y al consumidor se le presentan los datos obsoletos.

Una respuesta típica es dejar que el productor funcione a toda velocidad y triple buffer de la producción (al menos, garantiza que el consumidor obtenga los datos producidos más recientemente). Perdone la autopromoción descarada.

Comentarios

  • gracias por la revisión comentario y el enlace. Sin embargo, en mi caso, mi consumidor solo bloquea un tiempo. Creo que todavía tiene sentido usar semáforos en lugar de mi solución actual usando variable de condición + bloqueo.
  • @nomanpouigt " alguna vez " es bastante diferente de " siempre tarde ".
  • lo siento, me refiero al búfer de anillo El tamaño se elige de tal manera que pueda adaptarse al retraso. Me pregunto que en c ++ no hay semáforos y se implementa usando mutex y variable de condición, por lo que si tiene sentido usar semáforos.
  • ¿Puede también elaborar su solución de búfer triple y cómo se aplicaría en mi caso? ? ¿Quiere decir que no ' no haga ningún bloqueo en el lado del productor y deje que se ejecute y el búfer del productor debe tener un búfer triple para que el consumidor pueda obtener datos recientes en caso de desbordes?

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *