저는 대용량 데이터를 파일에 자주 기록해야하는 애플리케이션을 고려하고 있습니다. 사용하고 싶습니다. 다른 스레드에서 실행되는 생산자와 소비자가 있습니다. 또한 데이터 크기가 매우 클 수 있으므로 고정 된 크기의 대기열을 갖고 싶습니다. boost :: lockfree :: queue with boost에 대한 간단한 테스트를 구현했습니다. :: condition_variable은 큐의 상태를 알립니다. 뮤텍스를 피하고 싶지만 대기열이 가득 차거나 (블록 생산자) 비어있는 경우 (블록 소비자) 예외에 대해 알고 싶습니다 (의견 기반 ..). m 조건을 적절하게 사용하거나 성능 문제가있는 경우-다른 방법을 사용하는 것과 비교합니다. 지금까지 수행 한 작업은 다음과 같습니다 (작은 데이터).
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #define N 6 #define QN 3 struct testdata { int a; int b; }; boost::lockfree::queue<testdata, boost::lockfree::fixed_size<true>> que(QN); boost::condition_variable que_has_data, que_has_room; boost::mutex que_mtx_hd, que_mtx_hr; void producer(void) { testdata td; int i = 0; boost::mutex::scoped_lock lock(que_mtx_hr); boost::this_thread::sleep(boost::posix_time::seconds(1)); for (;;) { td.a = i; td.b = i + 1; if (!que.push(td)) { std::cout << "producer waiting" << std::endl; que_has_room.wait(lock); } else { std::cout << "pushed " << td.a << std::endl; i += 1; que.has_data_notify_one(); } if (i > N) break; } } void consumer(void) { testdata td; boost::mutex::scoped_lock lock(que_mtx_hd); for (;;) { if (que.pop(td)) { std::cout << "popped " << td.a << std::endl; if (td.a == N) break; que_has_room.notify_one(); } else { std::cout << "consumer waiting" << std::endl; que_has_data.wait(lock); } } boost::this_thread::sleep(boost::posix_time::seconds(1)); } int main(void) { boost::thread t1(&producer); boost::thread t2(&consumer); t1.join(); t2.join(); return 0; }
작동합니다 (출력) :
consumer waiting pushed 0 pushed 1 pushed 2 producer waiting popped 0 pushed 3 producer waiting popped 1 pushed 4 producer waiting popped 2 pushed 5 producer waiting popped 3 pushed 6 popped 4 popped 5 popped 6
대부분의 경우 데이터를 거의 항상 사용할 수있을 것으로 예상되지만 혼잡시 차단하고 싶습니다 ( 파일 쓰기, 네트워크 등). 고정 크기에 대한 관심은 대량 데이터 세트와 대기열의 동적 할당에 대한 걱정입니다.-
(이것은 수행 할 수있는 작업에 대한 실험입니다. 실제로 내 데이터는 최대 약 20Hz로 업데이트됩니다. 따라서 크기를 관리하는 std :: queue를 잠그는 것도 잘 작동합니다.)
Answer
세마포어를 사용하여 큐가 가득 차면 생산자를 휴면 상태로 만들고 다른 세마포어를 사용하여 큐가 비어있을 때 소비자를 휴면 상태로 만듭니다. 대기열이 꽉 찼거나 비어 있지 않으면 sem_post 및 sem_wait 작업이 비 차단입니다 (최신 커널에서)
#include <semaphore.h> template<typename lock_free_container> class blocking_lock_free { public: lock_free_queue_semaphore(size_t n) : container(n) { sem_init(&pop_semaphore, 0, 0); sem_init(&push_semaphore, 0, n); } ~lock_free_queue_semaphore() { sem_destroy(&pop_semaphore); sem_destroy(&push_semaphore); } bool push(const lock_free_container::value_type& v) { sem_wait(&push_semaphore); bool ret = container::bounded_push(v); ASSERT(ret); if (ret) sem_post(&pop_semaphore); else sem_post(&push_semaphore); // shouldn"t happen return ret; } bool pop(lock_free_container::value_type& v) { sem_wait(&pop_semaphore); bool ret = container::pop(v); ASSERT(ret); if (ret) sem_post(&push_semaphore); else sem_post(&pop_semaphore); // shouldn"t happen return ret; } private: lock_free_container container; sem_t pop_semaphore; sem_t push_semaphore; };
Answer
잠금없는 컨테이너, 두 개의 뮤텍스 및 스레드 간 차단 대기열을 구현하는 조건부 변수를 포함하는 코드에 대해 매우 회의적입니다. 더 이상 보지 않고.
아마도 아래 프로토 타입부터 시작할 것입니다 (boost :: interprocess에 바로 사용할 수있는 것이 있는지 먼저 확인) :
- wrap
boost::circular_buffer
를facebook/Folly/Synchronized
로 입력하지만try_lock()
를 수행 한 다음try_lock()
, 그런 다음lock()
에서 차단, 세 가지 시나리오의 발생 횟수 계산, 알림 / 대기 맨 위에 있음 - 파일럿 모드에서 프로덕션에 릴리스하고 잠금없는 컨테이너를 정말로 사용해야하는지 확인합니다.
댓글
- ' 귀하의 의사 코드를 잘 따르지 않지만 우려하시는 점은 이해합니다. 나는 scoped_lock을 사용하면 이것이 괜찮을 것이라고 생각했고, 각각 조건부 대기를 위해 '의 뮤텍스가 필요하다고 생각했습니다.
답변
Boost는 잠금이없는 단일 생산자 단일 소비자 대기열을 제공합니다. 알고 계십니까? 사용 사례에 정확히 맞는 것 같습니다.
http://www.boost.org/doc/libs/1_61_0/doc/html/boost/lockfree/spsc_queue.html
고정 된 크기를 사용할 수 있으며 데이터를 사용할 수없는 경우 사용자가 직접 구현하지 않고도 차단할 수 있습니다.
댓글
- 그렇습니다. 이것을 주목하지만 내 (네트워크 연결되지 않은) 컴퓨터에는 이것을 놓친 것 같습니다. 즉, 성공 여부에 관계없이 팝과 푸시가 동일하게 작동하고 차단하지 않고 bool을 반환한다고 믿습니다.