Inter-thread communication in C++ #2

Posted at 2008. 10. 24. 00:47 // in S/W개발/C++ 이야기 // by 김윤수


어제에 이어 type-safe하게 inter-thread communication을 하는 방법을 계속 고민해 봤습니다. 아주 일반적이진 않지만 적어도 인자를 하나 갖고 리턴 데이터는 없는 멤버 함수를 호출하는 경우는 처리될 수 있도록 작성해 봤습니다.

가장 핵심적인 부분은 doIt class와  callIt member function, m_callQ 입니다.

callIt() 코드를 보시면 먼저 bind(mf, this, _1)를 이용해서 ArgType 인자 하나만 필요로 하는 functor로 만들고, 그 functor와 ArgType 인자인 arg를 doIt instance에 넣어줍니다. 그렇게 하면 doIt instance를 boost::function<void (void)> 타입에 assign이 가능하게 됩니다. 즉, 아무런 인자도 없고 리턴값도 없는 functor가 되는 것이지요. m_callQ는 이런 instance들의 queue이구요. 이렇게 함으로써 서로 다른 타입의 인자를 갖는 member function들을 하나의 call queue로 관리할 수 있게 한 것이지요. 솔직히 이게 제일 어려운 점 중 하나였습니다.

아주 아름다운 솔루션은 아니지만 그래도 그럭저럭 실제 코드에 적용해 볼만 한 것 같습니다. 인자가 하나뿐이더라도 여러 개 인자가 있는 경우를 그런 인자를 필드로 정의하는 구조체를 하나 만들거너 tuple을 쓰면 되니까요. 그리고 inter-thread communication을 할 경우에는 리턴값을 필요로 하는 synchronous call보다는 메시지를 보내는 것같은 asynchronous call이 많을테니 그럭저럭 쓸만할 것 같습니다.

무엇보다 맘에 드는 것은 type-safe하게 ITC를 할 수 있다는 것입니다. void* 와 casting을 사용하지 않고 template을 사용했기 때문에 컴파일 타임시에 인자가 맞지 않은 경우에는 에러가 발생해서 걸러지게 됩니다.

그리고, 이 아이디어를 확장하면 여러개의 인자가 있는 경우도 처리할 수 있지 않을까 하는 생각이 어렴풋하게 드네요.  callIt 이랑 doIt을 2개 인자, 3개 인자, 4개 인자...인 것들도 만들면 될테니까요.

음... 그리고 생각해 보니까 인자가 없는 경우도 만들어 놓아야 겠군요. 저번 소스는 인자가 없는 경우를 처리한 것이었는데 그걸 작업하다 보니 별 생각없이 지워버렸네요. 이전 글 참고해서 다시 만들어 놓아야겠습니다.

혹시 아래 소스 코드 보시고 문제점이나 더 좋은 생각 있으시면 댓글 좀 남겨주세요~

/// @file itc.cpp
#include <iostream>
#include <string>
#include <queue>
#include <functional>
#include <boost/thread.hpp>
#include <boost/lambda/lambda.hpp>
#include <boost/bind.hpp>
#include <boost/signal.hpp>

using namespace std;
using namespace boost;
using namespace boost::posix_time;

template <class ThreadType>
class ITCThread : public thread
{
public:
    typedef boost::function<void (void)> CallType;

    ITCThread(const string& name = "thr"):
        thread(bind(mem_fn(&ITCThread::run), this)),
        m_name(name),
        m_mutex(), m_cond(),
        m_req(false), m_callQ(), m_run(true)
        {}

    const string& getName()
    {
        return m_name;
    }

    void quit(int code)
    {
        callIt<int>(&ITCThread::doQuit, code);
    }

protected:
    template <typename ArgType>
    struct doIt {
        doIt(boost::function<void (ArgType)> call, ArgType arg) :
            m_call(call), m_arg(arg)
            {}

        void operator()()
        {
            m_call(m_arg);
        }

        boost::function<void (ArgType)> m_call;
        ArgType m_arg;
    };

    template <typename ArgType>
    void callIt(void (ThreadType::*mf)(ArgType), ArgType arg)
    {
        typedef boost::function<void (ArgType)> MemFuncType;

        unique_lock<mutex> lock(m_mutex);
        MemFuncType call = bind(mf, (ThreadType*)this, _1);
        m_callQ.push(doIt<ArgType>(call, arg));
        m_req = true;
        m_cond.notify_one();
    }

private:
    void run()
    {
        cout << m_name << ": starting..." << endl;
        while (m_run)
        {
            unique_lock<mutex> lock(m_mutex);
            while (!m_req)
            {
                m_cond.wait(lock);
            }

            while (!m_callQ.empty())
            {
                m_callQ.front()();
                m_callQ.pop();
            }
            m_req = false;
        }

    void doQuit(int code)
    {
        cout << m_name << ": exiting with code = " << code << endl;
        m_run = 0;
    }

    string m_name;
    mutex m_mutex;
    condition_variable m_cond;
    bool m_req;
    queue<CallType>  m_callQ;
    bool m_run;
};

class HelloThread : public ITCThread<HelloThread>
{
public:
    HelloThread(const string& name = "hello") :
        ITCThread<HelloThread>(name)
        {}

    void hello(const string& name)
    {
        callIt<const string&>(&HelloThread::doHello, name);
    }

    void yahoo(double value)
    {
        callIt<double>(&HelloThread::doYahoo, value);
    }

private:
    void doHello(const string& name)
    {
        cout << getName() << ": Hello~ " << name << endl;
    }

    void doYahoo(double value)
    {
        cout << getName() << ": Yahoo! " << value << endl;
    }

};

class PrintVector : public ITCThread<PrintVector>
{
public:
    PrintVector(const string& name = "prvec") :
        ITCThread<PrintVector>(name)
        {}
  
    void print(const vector<int>& vi)
    {
        callIt<const vector<int>&>(&PrintVector::doPrint, vi);
    }

private:
    void doPrint(const vector<int>& vi)
    {
        copy(vi.begin(), vi.end(), ostream_iterator<int>(cout, " "));
        cout << endl;
    }
};

int
main()
{
    HelloThread itc1("H1"), itc2("H2");

    for (int i = 0; i < 10; ++i)
    {
        this_thread::sleep(millisec(1000));
        itc1.hello("world!"), itc1.yahoo(10.0);
        this_thread::sleep(millisec(50));
        itc2.hello("body!"), itc2.yahoo(20.0);
    }
    this_thread::sleep(millisec(1000));
    itc1.quit(1);
    this_thread::sleep(millisec(50));
    itc2.quit(2);

    itc1.join();
    itc2.join();

    vector<int> vi;
    for (int i = 0; i < 10; ++i)
    {
        vi.push_back(i);
    }

    PrintVector prvec;
    prvec.print(vi);
    prvec.quit(0);

    prvec.join();

    return 0;
}

컴파일 해보시려면 boost 1.36.0이 설치되어 있어야 합니다. 그리고 명령행에서 다음과 같이 주시면 됩니다.

g++ -o itc itc.cpp -I/usr/local/include/boost-1_36/ -L/usr/local/lib -lboost_thread-xgcc40-mt

boost 1.36 header 파일이 /usr/local/include/boost-1_36에 설치되어 있고, boost thread library /usr/local/lib에 설치되어 있다고 가정한 것입니다.