Señalización de cola de tamaño fijo de C ++ llena / vacía

Estoy considerando una aplicación en la que se deben escribir datos grandes en un archivo con frecuencia. Me gustaría usar una cola y tener un productor y un consumidor ejecutándose en diferentes subprocesos. Además, me gustaría tener una cola de tamaño fijo ya que el tamaño de los datos puede ser muy grande. He implementado una prueba simple de boost :: lockfree :: queue con boost :: condition_variable para señalar el estado de la cola. Me gustaría evitar el mutex, pero para la excepción en la que la cola está llena (bloque de productor) o vacía (bloque de consumidor)

Me gustaría saber (en riesgo de opinión basada …) si » Estoy usando los condicionales correctamente o si hay un problema de rendimiento, en comparación con el uso de otros métodos. Esto es lo que «he hecho hasta ahora (datos pequeños)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

Esto funciona (salida):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Anticipo que, en su mayor parte, los datos casi siempre estarán disponibles, pero quiero bloquear en el caso de congestión ( escritura de archivos, red, etc.). El interés en el tamaño fijo es la preocupación por los conjuntos de datos masivos y la asignación dinámica en la cola.

(Esto es más un experimento de lo que se puede hacer. En realidad, mis datos se actualizan como máximo alrededor de 20 Hz así que simplemente bloquear un std :: queue cuyo tamaño administro funcionará muy bien, también.)

Respuesta

Use un semáforo para hacer que los productores duerman cuando la cola está llena, y otro semáforo para hacer que los consumidores duerman cuando la cola está vacía. cuando la cola no está llena ni vacía, las operaciones sem_post y sem_wait no son bloqueantes (en los kernels más nuevos)

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Responder

Sería muy escéptico sobre el código que contiene un contenedor sin bloqueo, dos exclusiones mutuas y una variable condicional para implementar la cola de bloqueo entre subprocesos. Sin siquiera mirar más allá.

Probablemente comenzaría con el prototipo a continuación (tal vez primero verificando si boost :: interprocess tiene algo que pueda usar de inmediato):

  1. wrap boost::circular_buffer en facebook/Folly/Synchronized pero con un casillero personalizado que hace try_lock(), luego gira 41 veces más con try_lock(), luego bloquea en lock(), contando las ocurrencias de los tres escenarios y con notificación / espera en la parte superior
  2. Lo lanzaría a producción en modo piloto y verificaría si realmente necesito molestarme con un contenedor sin candados.

Comentarios

  • Yo ‘ no sigo muy bien su pseudocódigo, pero comprendo las preocupaciones que tiene. Había calculado que el uso de scoped_lock haría que esto estuviera bien, y que cada uno lo requiere ‘ s mutex para la espera condicional.

Respuesta

Boost proporciona una cola de un solo productor y un solo consumidor que no tiene bloqueos, creo, ¿lo conoce? Creo que se ajusta exactamente a su caso de uso.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Puede usar un tamaño fijo y habrá bloqueado al consumidor si no hay datos disponibles, etc. sin implementarlo usted mismo.

Comentarios

  • Lo hice observe esto, pero mi máquina (no en red) parecía no tener esto. Dicho esto, creo que pop y push funcionan de la misma manera, sin bloquear el retorno de bool, ya sea exitoso o no.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *