Inter-thread communication in C++ #3

Posted at 2008. 10. 26. 00:12 // in S/W개발/C++ 이야기 // by 김윤수


2008/10/24 - [S/W개발/C++ 이야기] - Inter-thread communication in C++ #2

윗 글에 이어 ITCThread를 좀 더 개선해 보았습니다. 이번 개선 사항은

1) 인자가 없는 멤버 함수, 인자가 하나인 멤버 함수, 인자가 두개인 멤버 함수까지 지원이 가능하게 됐고,
2) ITCThread에서 직접 상속을 받아서 Inter-thread communication을 통해 호출될 수 있는 member function을 정의하는 패턴 외에 기존에 이미 존재하던 클래스를 ITCThread context 안에서 실행되게 하는 방식도 추가했습니다. 즉, 기존에는

class NewThreadType : public ITCThread<NewThreadType>
{
  ......
};

이런 방식만 가능했기 때문에 ITCThread 기능을 사용하려면 새로운 thread class를 따로 정의해야 했으나, 이제는 다음과 같이 기존 class를 ITCThread로 바꾸는 것도 가능해졌습니다.

class ITCThreadAdaptor : public LegacyType,
                         public ITCThread<LegacyType>
{
  ......
};

이걸 위해 ITCThread template의 부모 클래스로 Thread 라는 클래스들 두고 기존에 ITCThread에 정의되어 있던 대부분의 멤버 변수 및 멤버 함수를 Thread로 옮겼습니다. quit() 이라는 멤버 함수는 ITCThread 에 모두 공통적으로 있어야 하는 멤버 함수라고 생각했는데, 기존처럼 quit 을  ITCThread 에 정의할 경우 2)번 패턴에서 quit() 멤버 함수를 정의되지 않은 멤버 함수라는 컴파일 에러가 발생하였기 때문입니다.

Thread  라는 클래스를 정의하면서 대부분의 멤버 함수 및 멤버 변수를 public 으로 선언해서 ITCThread 에서 제한없이 사용할 수 있게 함으로써 ITCThread 구현을 간단하게 만들었고, Thread의 public 멤버 들이 ITCThread로부터 상속받는 클래스에 공개되는 것을 방지하기 위해 ITCThread는 Thread를 private 상속하도록 했습니다. 이와 함께 Thread::CallType, Thread::quit, thread::join, Thread::getName 등은 using declaration 구문을 사용하여 선별적으로 하위클래스에 공개되도록 했습니다.

1) 번이 가능하게 하기 위해 doItNoArg, doItWithOneArg<ArgType>, doItWithTwoArgs<ArgType1, ArgType2>  helper class를 정의하고, callIt, callIt<ArgType>, callIt<ArgType1, ArgType2> template 멤버 함수를 정의했습니다.

다음은 코드입니다.

/// Inter-thread communication
/// @file itc.cpp

#include <iostream>
#include <string>
#include <queue>
#include <functional>
#include <boost/thread.hpp>
#include <boost/lambda/lambda.hpp>
#include <boost/bind.hpp>
#include <boost/signal.hpp>

using namespace std;
using namespace boost;
using namespace boost::posix_time;

struct Thread : public thread
{
    typedef boost::function<void (void)> CallType;
   
    Thread(const string& name = "thr"):
        thread(bind(mem_fn(&Thread::run), this)),
        m_name(name),
        m_mutex(), m_cond(),
        m_req(false), m_callQ(), m_run(true)
        {}

    const string& getName()
    {
        return m_name;
    }

    void quit(int code)
    {
        callIt(&Thread::doQuit, code);
    }
   
    void quit()
    {
        callIt(&Thread::doQuit);
    }

private:
    struct doItWithNoArg {
        doItWithNoArg(CallType call) : m_call(call)    {}
       
        void operator()()
        {
            m_call();
        }
       
        CallType m_call;
    };

    void callIt(void (Thread::*mf)(void))
    {
        unique_lock<mutex> lock(m_mutex);
        m_callQ.push(doItWithNoArg(bind(mf, (Thread*)this)));
        m_req = true;
        m_cond.notify_one();
    }

    void run()
    {
        cout << m_name << ": starting..." << endl;
        while (m_run)
        {
            unique_lock<mutex> lock(m_mutex);
            while (!m_req)
            {
                m_cond.wait(lock);
            }
           
            while (!m_callQ.empty())
            {
                m_callQ.front()();
                m_callQ.pop();
            }
            m_req = false;
        }
    }

    struct doItWithIntArg {
        typedef boost::function<void (int)> CallType;
       
        doItWithIntArg(CallType call, int arg) :
            m_call(call), m_arg(arg)
            {}
       
        void operator()()
        {
            m_call(m_arg);
        }
       
        CallType m_call;
        int m_arg;
    };
   
    void callIt(void (Thread::*mf)(int), int arg)
    {
        unique_lock<mutex> lock(m_mutex);
        m_callQ.push(doItWithIntArg(bind(mf, (Thread*)this, _1),
                                    arg));

        m_req = true;
        m_cond.notify_one();
    }   

    void doQuit(int code)
    {
        cout << m_name << ": exiting with code = " << code << endl;
        m_run = 0;
    }
   
    void doQuit()
    {
        cout << m_name << ": exiting without code" << endl;
        m_run = 0;
    }

// Declare the member variables as public so that ITCThread
// is easy to implement
public:
    string m_name;
    mutex m_mutex;
    condition_variable m_cond;
    bool m_req;
    queue<CallType>  m_callQ;
    bool m_run;   
};

template <class ThreadType>
class ITCThread : private Thread
{
public:   
    ITCThread(const string& name = "thr"): Thread(name) {}

    using Thread::quit;
    using thread::join;
    using Thread::getName;

protected:
    using Thread::CallType;

    // for member functions for no arguments
    struct doItWithNoArg {
        doItWithNoArg(CallType call) : m_call(call)    {}
       
        void operator()()
        {
            m_call();
        }
       
        CallType m_call;
    };

    void callIt(void (ThreadType::*mf)(void))
    {
        unique_lock<mutex> lock(m_mutex);
        m_callQ.push(doItWithNoArg(bind(mf, (ThreadType*)this)));
        m_req = true;
        m_cond.notify_one();
    }

    // for member functions with one argument   
    template <typename ArgType>
    struct doItWithOneArg {
        typedef boost::function<void (ArgType)> OneArgCallType;
       
        doItWithOneArg(OneArgCallType call, ArgType arg) :
            m_call(call), m_arg(arg)
            {}
       
        void operator()()
        {
            m_call(m_arg);
        }
       
        OneArgCallType m_call;
        ArgType m_arg;
    };

    template <typename ArgType>
    void callIt(void (ThreadType::*mf)(ArgType), ArgType arg)
    {
        unique_lock<mutex> lock(m_mutex);
        m_callQ.push(doItWithOneArg<ArgType>
                     (bind(mf, (ThreadType*)this, _1), arg));

        m_req = true;
        m_cond.notify_one();
    }

    // for member functions with two arguments   
    template <typename ArgType1, typename ArgType2>
    struct doItWithTwoArgs {
        typedef boost::function<void (ArgType1, ArgType2)>
            TwoArgsCallType;


        doItWithTwoArgs(TwoArgsCallType call,
                        ArgType1 arg1,
                        ArgType2 arg2) :

            m_call(call), m_arg1(arg1), m_arg2(arg2)
            {}
       
        void operator()()
        {
            m_call(m_arg1, m_arg2);
        }
       
        TwoArgsCallType m_call;
        ArgType1 m_arg1;
        ArgType2 m_arg2;
    };

    template <typename ArgType1, typename ArgType2>
    void callIt(void (ThreadType::*mf)(ArgType1, ArgType2),
                ArgType1 arg1, ArgType2 arg2)

    {
        unique_lock<mutex> lock(m_mutex);
        m_callQ.push(doItWithTwoArgs<ArgType1,ArgType2>
                     (bind(mf, (ThreadType*)this, _1, _2),
                     arg1, arg2));

        m_req = true;
        m_cond.notify_one();
    }

};

class HelloThread : public ITCThread<HelloThread>
{
public:
    HelloThread(const string& name = "hello") :
        ITCThread<HelloThread>(name) {}
   
    void hello(const string& name)
    {
        callIt<const string&>(&HelloThread::doHello, name);
    }

    void yahoo()
    {
        callIt(&HelloThread::doYahoo);
    }

private:
    void doHello(const string& name)
    {
        cout << getName() << ": Hello~ " << name << endl;
    }
   
    void doYahoo()
    {
        cout << getName() << ": Yahoo~~~~~!" << endl;
    }
};

class PrintVector : public ITCThread<PrintVector>
{
public:
    PrintVector(const string& name = "prvec") :
        ITCThread<PrintVector>(name)
        {}

    // one argument   
    void print(const vector<int>& vi)
    {
        callIt<const vector<int> >(&PrintVector::doPrint, vi);
    }

    // two arguments   
    void printTwoVectors(const vector<int>& vi,
                         const vector<double>& vd)

    {
        callIt<const vector<int>, const vector<double> >
            (&PrintVector::doPrintTwoVectors, vi, vd);
    }
   
private:
    void doPrint(const vector<int> vi)
    {
        copy(vi.begin(), vi.end(), ostream_iterator<int>(cout, " "));
        cout << endl;
    }

    void doPrintTwoVectors(const vector<int> vi,
                           const vector<double> vd)

    {
        copy(vi.begin(), vi.end(), ostream_iterator<int>(cout, " "));
        cout << endl;
        copy(vd.begin(), vd.end(),
             ostream_iterator<double>(cout, " "));

        cout << endl;
    }
};

// legacy class
struct Hello
{
    void sayHelloTo(const string name)
    {
        cout << "Hello, " << name << "!" << endl;
    }
};

// Adapt the Hello for ITCThread
// With this adaptor, Hello class can be executed
// in the context of another thread

struct HelloITCAdaptor : public Hello, public ITCThread<Hello>
{
    HelloITCAdaptor(const string& name) : ITCThread<Hello>(name) {}

    void sayHelloTo(const string& name)
    {
        callIt<const string>(&Hello::sayHelloTo, name);
    }
};

int
main()
{
    HelloThread itc1("H1"), itc2("H2");
   
    for (int i = 0; i < 10; ++i)
    {
        this_thread::sleep(millisec(1000));
        itc1.hello("world!"), itc1.yahoo();
        this_thread::sleep(millisec(50));
        itc2.hello("body!"), itc2.yahoo();
    }
    this_thread::sleep(millisec(1000));
    itc1.quit(1);
    this_thread::sleep(millisec(50));
    itc2.quit(2);
   
    itc1.join();
    itc2.join();

    PrintVector prvec;

    vector<int> vi;
    for (int i = 0; i < 10; ++i)
    {
        vi.push_back(i);
    }
    prvec.print(vi);
    this_thread::sleep(millisec(50));

    vector<double> vd;
    for (int i = 0; i < 10; ++i)
    {
        vd.push_back(i * 10.5);
    }
    prvec.printTwoVectors(vi, vd);

    prvec.quit();
    prvec.join();
   
    HelloITCAdaptor hello("hello");
    for (int i = 0; i < 1000; ++i)
    {
        hello.sayHelloTo("Yoonsoo Kim");
    }
    hello.quit();
    hello.join();

    return 0;
}

다음에는 꼭 멤버 함수 뿐만이 아닌 이미 정의되어 있는 함수를 특정 thread context에서 호출하는 기능도 추가해 보겠습니다. 그리고, callIt 멤버 함수들을 public 으로 변경하는 것도 생각 중입니다. 그렇게 하면 ITCThread에서 상속받은 클래스에서 굳이 호출하고 싶은 member function 들 즉, 위 예에서는 HelloThread::doHello, HelloThread::doYahoo, PrintVector::doPrint, HelloITCAdaptor::sayHelloTo 를 위한 wrapping member function을 따로 작성할 필요도 없어질테니까요. callIt을 쓰면 일반 기존 member function 호출 형태와는 약간 달라지긴 하지만 모든 member function에 대한 wrapping member function을 작성하는 것도 귀찮은 일이니 callIt을 public으로 선언하는 것도 괜찮을 것 같습니다.

비합리적인 비방을 제외한 질문, 개선사항, 버그보고, 근거있는 비평 모든 종류의 댓글 환영하니 댓글 쫌~~~!(비굴 모드). 아주 기초적인 질문도 환영입니다. ^^