Estou considerando um aplicativo em que dados grandes precisam ser gravados em arquivo com frequência. Gostaria de usar uma fila e ter um produtor e um consumidor em execução em threads diferentes. Além disso, gostaria de ter uma fila de tamanho fixo, pois o tamanho dos dados pode ser muito grande. Implementei um teste simples de boost :: lockfree :: queue with boost :: condição_variável para sinalizar o estado da fila. Eu gostaria de evitar o mutex, mas para a exceção em que a fila está cheia (produtor do bloco) ou vazia (consumidor do bloco)
Eu gostaria de saber (com risco de opinião ..) se eu ” Estou usando condicionais corretamente ou se houver um problema de desempenho – em comparação com o uso de outros métodos. Aqui está o que eu fiz até agora (dados pequenos)
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; }
Isso funciona (saída):
consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6
Eu prevejo que na maior parte, os dados quase sempre estarão disponíveis, mas quero bloquear em caso de congestionamento ( gravação de arquivo, rede, etc). O interesse no tamanho fixo é a preocupação com conjuntos de dados massivos e alocação dinâmica na fila –
(Isso é mais uma experiência do que pode ser feito. Na realidade, meus dados são atualizados no máximo cerca de 20 Hz então bloquear uma std :: queue cujo tamanho eu gerencio também funcionará muito bem.)
Resposta
Use um semáforo para fazer os produtores dormirem quando a fila estiver cheia, e outro semáforo para fazer os consumidores dormirem quando a fila estiver vazia. quando a fila não está cheia nem vazia, as operações sem_post e sem_wait não bloqueiam (nos kernels mais recentes)
#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; };
Resposta
Eu ficaria muito cético sobre o código que contém um contêiner sem bloqueio, dois mutexes e uma variável condicional para implementar a fila de bloqueio entre threads. Sem nem mesmo olhar mais longe.
Eu provavelmente começaria pelo protótipo abaixo (talvez primeiro verificando se boost :: interprocess tem algo que eu pudesse usar imediatamente):
- wrap
boost::circular_buffer
emfacebook/Folly/Synchronized
, mas com um armário personalizado que faztry_lock()
, depois gira mais 41 vezes comtry_lock()
, em seguida, bloqueia emlock()
, contando ocorrências de todos os três cenários e com notificação / espera no topo - Eu o liberaria para produção em modo piloto e verificaria se realmente preciso me preocupar com um contêiner sem bloqueio.
O Boost fornece uma fila de produtor único e consumidor único que não tem bloqueio, eu acho, você sabe sobre isso? Acho que se encaixa exatamente no seu caso de uso.
http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html
Você pode usar tamanho fixo e bloqueará o consumidor, se não houver dados disponíveis, etc., sem implementá-lo sozinho.
Comentários