반응형
이번 글에서는 std::mutex와 std::condition_variable를 사용해서 멀티 스레드에 안전한 queue를 예제로
구현해봅니다.
NotifyQueue 클래스
/////////////////////////////////////////////////////////////////////////
// NotifyQueue는 queue에 데이터를 넣을 때 condition_variable로 notify를 전송합니다.
// queue에 데이터를 가져올 때 queue가 비어 있다면 condition_variable를 통해서 대기합니다.
template<typename OBJ>
class NotifyQueue {
public:
using Container = std::queue<OBJ>;
NotifyQueue() :is_run_(true) {};
virtual ~NotifyQueue() {};
// 컨테이너가 비어 있는지 확인합니다.
// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.
bool Empty() {
std::shared_lock<std::shared_mutex> sl(sm_);
return con_.empty();
}
// OBJ를 컨테이너에서 추출합니다.
// 데이터가 없다면 condition_variable을 통해서 대기 합니다.
bool Pop(OBJ& obj) {
std::unique_lock<std::shared_mutex> ul(sm_);
cv_.wait(ul, [this] {return (!con_.empty() || !is_run_); });
if (!is_run_) { return false; }
if(!con_.empty()) {
obj = con_.front();
con_.pop();
return true;
}
return false;
}
// 데이터를 컨테이너에 넣습니다.
// condition_variable 를 통해서 통지합니다.
void Push(const OBJ& obj) {
{
std::unique_lock<std::shared_mutex> sl(sm_);
con_.push(obj);
}
cv_.notify_one();
}
// 모든 대기 스레드에 통지를 전달합니다.
void NotifyAll()
{
cv_.notify_all();
}
// 컨테이너의 크기를 확인합니다.
// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.
size_t Size() {
std::shared_lock<std::shared_mutex> sl(sm_);
return con_.size();
}
bool IsRun() { return is_run_; }
void SetStop() {
std::unique_lock<std::shared_mutex> ul(sm_);
is_run_ = false;
NotifyAll();
}
private:
Container con_;
std::condition_variable_any cv_;
std::shared_mutex sm_;
bool is_run_;
};
NotifyQueue 클래스는 std::shared_mutex와 std::condition_variable_any를 사용해서 queue를 구현합니다.
Empty()와 Size()는 데이터 수정이 없기 때문에 shared_lock을 사용해서 공유 모드 잠금을 사용합니다.
Pop()과 Push()의 경우 데이터 수정이 있기 때문에 unique_lock을 사용해서 배타적 잠금을 사용합니다.
Push함수 동작 방식
push를 수행하면 mutex를 통해서 락을 획득 한 후에 데이터를 추가하고 조건 변수를 통해서 알림을 전송합니다.
Pop() 함수 동작 방식
Pop()함수를 수행하면 Queue에 데이터가 있다면 데이터를 추출하고 데이터가 없다면 데이터가 들어 올 때까지
조건 변수를 통해서 대기합니다.
CommonNotifyQueue 클래스
/////////////////////////////////////////////////////////////////////////
// CommonNotifyQueue는 NotifyQueue 일반적인 사용법에 대한 정의를 구현합니다.
// 사용자는 해당 클래스를 상속받아서 RunExecute 구현한다면 손쉽게 사용 할 수 있습니다.
template<typename OBJ>
class CommonNotifyQueue {
public:
CommonNotifyQueue(size_t run_thread_count = 1) : pfexecute_(nullptr) {
pnotify_ = std::make_shared<NotifyQueue<OBJ>>();
pproducer_ = std::make_shared<Producer<OBJ>>(pnotify_);
pconsumer_ = std::make_shared<Consumer<OBJ>>(pnotify_);
for (int i = 0; i < run_thread_count; i++) {
Run();
}
}
virtual ~CommonNotifyQueue() {}
void Push(OBJ&& obj) {
Push(obj);
}
void Push(const OBJ& obj) {
pproducer_->Push(obj);
}
virtual void RunExecute(const OBJ& obj) = 0;
void Run() {
thread_group_.emplace_back([this] {
pconsumer_->Execute([this](const OBJ& obj) {
RunExecute(obj);
});
});
}
void Stop() {
pnotify_->SetStop();
}
void Join() {
for (auto& t : thread_group_) {
t.join();
}
}
private:
std::shared_ptr<NotifyQueue<OBJ>> pnotify_;
std::shared_ptr <Producer<OBJ>> pproducer_;
std::shared_ptr <Consumer<OBJ>> pconsumer_;
std::vector<std::thread> thread_group_;
std::function<void(const OBJ&)> pfexecute_;
};
CommonNotifyQueue 클래스는 사용자가 멀티 스레드 환경에서 NotifyQueue클래스를 손쉽게 사용할 수 있도록
구현한 클래스입니다.
사용자는 NotifyQueue클래스를 자유롭게 사용할 수 있지만 그렇게 하기 위해서는 내부 구현에 대해서 이해가 필요하기
때문에 접근성이 떨어집니다. 단순히 생산자 및 소비자 패턴의 형태로 NotifyQueue클래스를 사용을 원하는 사용자는
백그라운드 스레드 갯수와 및 RunExecute()함수만 구현해서 손쉽게 접근 할 수 있습니다.
최종 구현 예제
#include <iostream>
#include <mutex>
#include <thread>
#include <functional>
#include <queue>
#include <shared_mutex>
#include <chrono>
#include <string>
#include <condition_variable>
using namespace std;
/////////////////////////////////////////////////////////////////////////
// NotifyQueue는 queue에 데이터를 넣을 때 condition_variable로 notify를 전송합니다.
// queue에 데이터를 가져올 때 queue가 비어 있다면 condition_variable를 통해서 대기합니다.
template<typename OBJ>
class NotifyQueue {
public:
using Container = std::queue<OBJ>;
NotifyQueue() :is_run_(true) {
};
virtual ~NotifyQueue() {};
// 컨테이너가 비어 있는지 확인합니다.
// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.
bool Empty() {
std::shared_lock<std::shared_mutex> sl(sm_);
return con_.empty();
}
// OBJ를 컨테이너에서 추출합니다.
// 데이터가 없다면 condition_variable을 통해서 대기 합니다.
bool Pop(OBJ& obj) {
std::unique_lock<std::shared_mutex> ul(sm_);
cv_.wait(ul, [this] {return (!con_.empty() || !is_run_); });
if (!is_run_) { return false; }
if(!con_.empty()) {
obj = con_.front();
con_.pop();
return true;
}
return false;
}
// 데이터를 컨테이너에 넣습니다.
// condition_variable 를 통해서 통지합니다.
void Push(const OBJ& obj) {
{
std::unique_lock<std::shared_mutex> sl(sm_);
con_.push(obj);
}
cv_.notify_one();
}
// 모든 대기 스레드에 통지를 전달합니다.
void NotifyAll()
{
cv_.notify_all();
}
// 컨테이너의 크기를 확인합니다.
// 데이터 수정이 없기 때문에 shared lock모드를 사용합니다.
size_t Size() {
std::shared_lock<std::shared_mutex> sl(sm_);
return con_.size();
}
bool IsRun() { return is_run_; }
void SetStop() {
is_run_ = false;
NotifyAll();
}
private:
Container con_;
std::condition_variable_any cv_;
std::shared_mutex sm_;
bool is_run_;
};
/////////////////////////////////////////////////////////////////////////
// NotifyQueue를 사용하는 생산자 클래스를 생성합니다.
template<typename OBJ>
class Producer {
public:
using obj = OBJ;
Producer(std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue) : pnotify_queue_(pnotify_queue) {
}
void Push(const OBJ& str) {
pnotify_queue_->Push(str);
}
private:
std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue_;
};
/////////////////////////////////////////////////////////////////////////
// NotifyQueue를 사용하는 소비자 클래스를 생성합니다.
template<typename OBJ>
class Consumer {
public:
using MyProducer = Producer<std::string>;
Consumer(std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue) : pnotify_queue_(pnotify_queue)
{
}
void Execute(std::function<void(const OBJ& obj)> phandle) {
do {
OBJ obj;
auto bsuccess = pnotify_queue_->Pop(obj);
if (bsuccess) phandle(obj);
} while (pnotify_queue_->IsRun());
}
private:
std::shared_ptr<NotifyQueue<OBJ>> pnotify_queue_;
};
/////////////////////////////////////////////////////////////////////////
// CommonNotifyQueue는 NotifyQueue 일반적인 사용법에 대한 정의를 구현합니다.
// 사용자는 해당 클래스를 상속받아서 RunExecute 구현한다면 손쉽게 사용 할 수 있습니다.
template<typename OBJ>
class CommonNotifyQueue {
public:
CommonNotifyQueue(size_t run_thread_count = 1) : pfexecute_(nullptr) {
pnotify_ = std::make_shared<NotifyQueue<OBJ>>();
pproducer_ = std::make_shared<Producer<OBJ>>(pnotify_);
pconsumer_ = std::make_shared<Consumer<OBJ>>(pnotify_);
for (int i = 0; i < run_thread_count; i++) {
Run();
}
}
virtual ~CommonNotifyQueue() {}
void Push(OBJ&& obj) {
Push(obj);
}
void Push(const OBJ& obj) {
pproducer_->Push(obj);
}
virtual void RunExecute(const OBJ& obj) = 0;
void Run() {
thread_group_.emplace_back([this] {
pconsumer_->Execute([this](const OBJ& obj) {
RunExecute(obj);
});
});
}
void Stop() {
pnotify_->SetStop();
}
void Join() {
for (auto& t : thread_group_) {
t.join();
}
}
private:
std::shared_ptr<NotifyQueue<OBJ>> pnotify_;
std::shared_ptr <Producer<OBJ>> pproducer_;
std::shared_ptr <Consumer<OBJ>> pconsumer_;
std::vector<std::thread> thread_group_;
std::function<void(const OBJ&)> pfexecute_;
};
///=====================================================
// CommonNotifyQueue를 사용해서 std::string을 컨테이너로 전달받아서 멀티 스레드로 출력하는 예제를 만들어 봅니다.
class PrintMsgManager : public CommonNotifyQueue<std::string> {
public:
using Object = std::string;
PrintMsgManager(size_t run_thread_count = 1) : CommonNotifyQueue<std::string>(run_thread_count) {
}
virtual ~PrintMsgManager() {}
void RunExecute(const std::string& obj) override {
std::cout << "ThreadId[" << std::this_thread::get_id() << "] Processing ==> " << obj.c_str() << "\n";
}
};
int main() {
using namespace std::chrono_literals;
// 백그라운드 스레드 4개로 시작합니다.
PrintMsgManager printManager(4);
std::cout << "Start PrintMsgManager Back Thread 4" << std::endl;
for (int i = 1; i <= 5000; i++) {
printManager.Push(std::to_string(i));
}
std::this_thread::sleep_for(7s);
printManager.Stop();
std::cout << "Notify Stop signal" << std::endl;
std::this_thread::sleep_for(1s);
printManager.Join();
std::cout << "All thread Joinable...." << std::endl;
std::cout << "After 3 sec Application Exit " << std::endl;
std::this_thread::sleep_for(3s);
return 0;
}
반응형
'c++ > c++' 카테고리의 다른 글
[c++] 반환값 최적화 (RVO, return-value-optimization) (0) | 2024.09.26 |
---|---|
[c++] find_if (0) | 2024.09.14 |
[c++] 키워드 (0) | 2020.03.08 |
[c++] vector (0) | 2019.10.03 |
[c++] chrono를 사용한 수행 시간 출력 클래스 (0) | 2019.09.20 |