Signalisation de file dattente de taille fixe C ++ pleine / vide

Jenvisage une application où de grandes données doivent être écrites souvent dans un fichier. Je voudrais utiliser une file dattente et avoir un producteur et un consommateur fonctionnant sur différents threads. De plus, jaimerais avoir une file dattente de taille fixe car la taille des données peut être très grande. Jai implémenté un test simple de boost :: lockfree :: queue avec boost :: condition_variable pour signaler létat de la file dattente. Je voudrais éviter le mutex mais pour lexception où la file dattente est pleine (producteur de bloc) ou vide (consommateur de bloc)

Je voudrais savoir (au risque dopinion basée ..) si je  » m en utilisant correctement les conditions ou sil y a un problème de performances – par rapport à lutilisation dautres méthodes. Voici ce que jai fait jusquà présent (petites données)

#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; } 

Cela fonctionne (sortie):

consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6 

Je prévois que pour la plupart, les données seront presque toujours disponibles mais je veux bloquer en cas de congestion ( écriture de fichier, réseau, etc.). Lintérêt de la taille fixe est linquiétude des ensembles de données massifs et de lallocation dynamique dans la file dattente –

(Ceci est plus une expérience dans ce qui peut être fait. En réalité, mes données sont mises à jour au plus à environ 20 Hz donc le simple fait de verrouiller une file std :: queue dont je gère la taille fonctionnera très bien aussi.)

Réponse

Utilisez un sémaphore pour faire dormir les producteurs lorsque la file dattente est pleine, et un autre sémaphore pour faire dormir les consommateurs lorsque la file dattente est vide. lorsque la file dattente nest ni pleine ni vide, les opérations sem_post et sem_wait sont non bloquantes (dans les noyaux plus récents)

#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; }; 

Réponse

Je serais très sceptique quant au code contenant un conteneur sans verrou, deux mutex et une variable conditionnelle pour implémenter la file dattente de blocage interthread. Sans même chercher plus loin.

Je partirais probablement du prototype ci-dessous (peut-être dabord vérifier si boost :: interprocess a quelque chose que je pourrais utiliser tout de suite):

  1. wrap boost::circular_buffer dans facebook/Folly/Synchronized mais avec un casier personnalisé qui fait try_lock(), puis tourne encore 41 fois avec try_lock(), puis bloque sur lock(), en comptant les occurrences des trois scénarios, et avec notification / attente en haut
  2. Je publierais cela en production en mode pilote et vérifierais si je dois vraiment membêter avec un conteneur sans verrou.

Commentaires

  • Je ‘ ne suit pas très bien votre pseudo-code mais je comprends vos inquiétudes. Javais estimé que lutilisation de scoped_lock rendrait cela correct, et que chacun le requiert ‘ s mutex pour lattente conditionnelle.

Réponse

Boost fournit une file dattente à un seul producteur et à un seul consommateur, sans verrouillage je pense, le savez-vous? Je pense que cela correspond exactement à votre cas dutilisation.

http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html

Vous pouvez utiliser une taille fixe et vous aurez bloqué le consommateur si les données ne sont pas disponibles, etc. sans limplémenter vous-même.

Commentaires

  • Jai fait remarquez ceci mais ma machine (non en réseau) semblait manquer cela. Cela dit, je pense que le pop et le push fonctionnent de la même manière, les booléens retournant non bloquants avec succès ou non.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *