반응형

 이번 글에서는 std::mutex와 std::condition_variable를 사용해서 멀티 스레드에 안전한 queue를 예제로 

 구현해봅니다.

NotifyQueue 클래스 

/////////////////////////////////////////////////////////////////////////
// NotifyQueue는 queue에 데이터를 넣을 때 condition_variable로 notify를 전송합니다. 
// queue에 데이터를 가져올 때 queue가 비어 있다면 condition_variable를 통해서 대기합니다.
template<typename OBJ>
class NotifyQueue {
public:
	using Container = std::queue<OBJ>;
	NotifyQueue() :is_run_(true) {};
	virtual ~NotifyQueue() {};

	// 컨테이너가 비어 있는지 확인합니다. 							
	// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.							
	bool Empty() {
		std::shared_lock<std::shared_mutex> sl(sm_);
		return con_.empty();
	}

	// OBJ를 컨테이너에서 추출합니다. 							
	// 데이터가 없다면 condition_variable을 통해서 대기 합니다.							
	bool Pop(OBJ& obj) {
		std::unique_lock<std::shared_mutex> ul(sm_);
		cv_.wait(ul, [this] {return (!con_.empty() || !is_run_); });

		if (!is_run_) { return false; }

		if(!con_.empty()) {
			obj = con_.front();
			con_.pop();
			return true;
		}
		return false;
	}

	// 데이터를 컨테이너에 넣습니다. 							
	// condition_variable 를 통해서 통지합니다.							
	void Push(const OBJ& obj) {
		{
			std::unique_lock<std::shared_mutex> sl(sm_);
			con_.push(obj);
		}
		cv_.notify_one();
	}

	// 모든 대기 스레드에 통지를 전달합니다.							
	void NotifyAll()
	{
		cv_.notify_all();
	}

	// 컨테이너의 크기를 확인합니다. 							
	// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.							
	size_t Size() {
		std::shared_lock<std::shared_mutex> sl(sm_);
		return con_.size();
	}

	bool IsRun() { return is_run_; }
	void SetStop() {
    	std::unique_lock<std::shared_mutex> ul(sm_);
		is_run_ = false;
		NotifyAll();
	}
private:
	Container						con_;
	std::condition_variable_any		cv_;
	std::shared_mutex				sm_;
	bool							is_run_;
};

 NotifyQueue 클래스는 std::shared_mutex와 std::condition_variable_any를 사용해서 queue를 구현합니다. 

 Empty()와 Size()는 데이터 수정이 없기 때문에 shared_lock을 사용해서 공유 모드 잠금을 사용합니다. 

 Pop()과 Push()의 경우 데이터 수정이 있기 때문에 unique_lock을 사용해서 배타적 잠금을 사용합니다. 

Push함수 동작 방식

Push 동작 방식

 push를 수행하면 mutex를 통해서 락을 획득 한 후에 데이터를 추가하고 조건 변수를 통해서 알림을 전송합니다. 

Pop() 함수 동작 방식 

Pop() 함수 동작 방식 

 Pop()함수를 수행하면 Queue에 데이터가 있다면 데이터를 추출하고 데이터가 없다면 데이터가 들어 올 때까지

 조건 변수를 통해서 대기합니다. 

 

CommonNotifyQueue 클래스

/////////////////////////////////////////////////////////////////////////
// CommonNotifyQueue는 NotifyQueue 일반적인 사용법에 대한 정의를 구현합니다. 
// 사용자는 해당 클래스를 상속받아서 RunExecute 구현한다면 손쉽게 사용 할 수 있습니다. 

template<typename OBJ>
class CommonNotifyQueue {
public:
	CommonNotifyQueue(size_t run_thread_count = 1) : pfexecute_(nullptr) {
		pnotify_ = std::make_shared<NotifyQueue<OBJ>>();
		pproducer_ = std::make_shared<Producer<OBJ>>(pnotify_);
		pconsumer_ = std::make_shared<Consumer<OBJ>>(pnotify_);

		for (int i = 0; i < run_thread_count; i++) {
			Run();
		}
	}
	virtual ~CommonNotifyQueue() {}

	void Push(OBJ&& obj) {
		Push(obj);
	}

	void Push(const OBJ& obj) {
		pproducer_->Push(obj);
	}

	virtual void RunExecute(const OBJ& obj) = 0;

	void Run() {
		thread_group_.emplace_back([this] {
			pconsumer_->Execute([this](const OBJ& obj) {
				RunExecute(obj);
				});
			});
	}

	void Stop() {
		pnotify_->SetStop();
	}

	void Join() {
		for (auto& t : thread_group_) {
			t.join();
		}
	}

private:
	std::shared_ptr<NotifyQueue<OBJ>>	pnotify_;
	std::shared_ptr <Producer<OBJ>>		pproducer_;
	std::shared_ptr <Consumer<OBJ>>		pconsumer_;
	std::vector<std::thread>			thread_group_;
	std::function<void(const OBJ&)>		pfexecute_;
};

 CommonNotifyQueue 클래스는 사용자가 멀티 스레드 환경에서 NotifyQueue클래스를 손쉽게 사용할 수 있도록 

 구현한 클래스입니다.

 사용자는 NotifyQueue클래스를 자유롭게 사용할 수 있지만 그렇게 하기 위해서는 내부 구현에 대해서 이해가 필요하기

 때문에 접근성이 떨어집니다. 단순히 생산자 및 소비자 패턴의 형태로 NotifyQueue클래스를 사용을 원하는 사용자는 

 백그라운드 스레드 갯수와 및 RunExecute()함수만 구현해서 손쉽게 접근 할 수 있습니다. 

최종 구현 예제

#include <iostream>								
#include <mutex>								
#include <thread>								
#include <functional>
#include <queue>								
#include <shared_mutex>								
#include <chrono>	
#include <string>
#include <condition_variable>

using namespace std;					

/////////////////////////////////////////////////////////////////////////
// NotifyQueue는 queue에 데이터를 넣을 때 condition_variable로 notify를 전송합니다. 
// queue에 데이터를 가져올 때 queue가 비어 있다면 condition_variable를 통해서 대기합니다.
template<typename OBJ>
class NotifyQueue {
public:
	using Container = std::queue<OBJ>;
	NotifyQueue() :is_run_(true) {
	};
	virtual ~NotifyQueue() {};

	// 컨테이너가 비어 있는지 확인합니다. 							
	// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.							
	bool Empty() {
		std::shared_lock<std::shared_mutex> sl(sm_);
		return con_.empty();
	}

	// OBJ를 컨테이너에서 추출합니다. 							
	// 데이터가 없다면 condition_variable을 통해서 대기 합니다.							
	bool Pop(OBJ& obj) {
		std::unique_lock<std::shared_mutex> ul(sm_);
		cv_.wait(ul, [this] {return (!con_.empty() || !is_run_); });

		if (!is_run_) { return false; }

		if(!con_.empty()) {
			obj = con_.front();
			con_.pop();
			return true;
		}
		return false;
	}

	// 데이터를 컨테이너에 넣습니다. 							
	// condition_variable 를 통해서 통지합니다.							
	void Push(const OBJ& obj) {
		{
			std::unique_lock<std::shared_mutex> sl(sm_);
			con_.push(obj);
		}
		cv_.notify_one();
	}

	// 모든 대기 스레드에 통지를 전달합니다.							
	void NotifyAll()
	{
		cv_.notify_all();
	}

	// 컨테이너의 크기를 확인합니다. 							
	// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.							
	size_t Size() {
		std::shared_lock<std::shared_mutex> sl(sm_);
		return con_.size();
	}

	bool IsRun() { return is_run_; }
	void SetStop() {
		is_run_ = false;
		NotifyAll();
	}
private:
	Container						con_;
	std::condition_variable_any		cv_;
	std::shared_mutex				sm_;
	bool							is_run_;
};

/////////////////////////////////////////////////////////////////////////
// NotifyQueue를 사용하는 생산자 클래스를 생성합니다. 
template<typename OBJ>
class Producer {
public:
	using obj = OBJ;
	Producer(std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue) : pnotify_queue_(pnotify_queue) {
	}

	void Push(const OBJ& str) {
		pnotify_queue_->Push(str);
	}

private:
	std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue_;

};

/////////////////////////////////////////////////////////////////////////
// NotifyQueue를 사용하는 소비자 클래스를 생성합니다. 
template<typename OBJ>
class Consumer {
public:
	using MyProducer = Producer<std::string>;
	Consumer(std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue) : pnotify_queue_(pnotify_queue)
	{

	}

	void Execute(std::function<void(const OBJ& obj)> phandle) {
		do {
			OBJ obj;
			auto bsuccess = pnotify_queue_->Pop(obj);
			if (bsuccess) phandle(obj);
		} while (pnotify_queue_->IsRun());
	}
private:
	std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue_;
};


/////////////////////////////////////////////////////////////////////////
// CommonNotifyQueue는 NotifyQueue 일반적인 사용법에 대한 정의를 구현합니다. 
// 사용자는 해당 클래스를 상속받아서 RunExecute 구현한다면 손쉽게 사용 할 수 있습니다. 

template<typename OBJ>
class CommonNotifyQueue {
public:
	CommonNotifyQueue(size_t run_thread_count = 1) : pfexecute_(nullptr) {
		pnotify_ = std::make_shared<NotifyQueue<OBJ>>();
		pproducer_ = std::make_shared<Producer<OBJ>>(pnotify_);
		pconsumer_ = std::make_shared<Consumer<OBJ>>(pnotify_);

		for (int i = 0; i < run_thread_count; i++) {
			Run();
		}
	}
	virtual ~CommonNotifyQueue() {}

	void Push(OBJ&& obj) {
		Push(obj);
	}

	void Push(const OBJ& obj) {
		pproducer_->Push(obj);
	}

	virtual void RunExecute(const OBJ& obj) = 0;

	void Run() {
		thread_group_.emplace_back([this] {
			pconsumer_->Execute([this](const OBJ& obj) {
				RunExecute(obj);
				});
			});
	}

	void Stop() {
		pnotify_->SetStop();
	}

	void Join() {
		for (auto& t : thread_group_) {
			t.join();
		}
	}

private:
	std::shared_ptr<NotifyQueue<OBJ>>	pnotify_;
	std::shared_ptr <Producer<OBJ>>		pproducer_;
	std::shared_ptr <Consumer<OBJ>>		pconsumer_;
	std::vector<std::thread>			thread_group_;
	std::function<void(const OBJ&)>		pfexecute_;
};


///=====================================================								
// CommonNotifyQueue를 사용해서 std::string을 컨테이너로 전달받아서 멀티 스레드로 출력하는 예제를 만들어 봅니다. 
class PrintMsgManager : public CommonNotifyQueue<std::string> {
public:
	using Object = std::string;

	PrintMsgManager(size_t run_thread_count = 1) : CommonNotifyQueue<std::string>(run_thread_count) {
	}
	virtual ~PrintMsgManager() {}

	void RunExecute(const std::string& obj) override {
		std::cout << "ThreadId[" << std::this_thread::get_id() << "] Processing ==> " << obj.c_str() << "\n";
	}
};

int main() {
	using namespace std::chrono_literals;

	// 백그라운드 스레드 4개로 시작합니다. 							
	PrintMsgManager printManager(4);
	std::cout << "Start PrintMsgManager Back Thread 4" << std::endl;

	for (int i = 1; i <= 5000; i++) {
		printManager.Push(std::to_string(i));
	}

	std::this_thread::sleep_for(7s);

	printManager.Stop();
	std::cout << "Notify Stop signal" << std::endl;
	std::this_thread::sleep_for(1s);

	printManager.Join();

	std::cout << "All thread Joinable...." << std::endl;
	std::cout << "After 3 sec Application Exit " << std::endl;
	std::this_thread::sleep_for(3s);


	return 0;
}
반응형

'c++ > c++' 카테고리의 다른 글

[c++] 반환값 최적화 (RVO, return-value-optimization)  (0) 2024.09.26
[c++] find_if  (0) 2024.09.14
[c++] 키워드  (0) 2020.03.08
[c++] vector  (0) 2019.10.03
[c++] chrono를 사용한 수행 시간 출력 클래스  (0) 2019.09.20

+ Recent posts