Inter-thread communication in C++ #5

Posted at 2008. 10. 29. 02:42 // in S/W개발/C++ 이야기 // by 김윤수


2008/10/27 - [S/W개발/C++ 이야기] - Inter-thread communication in C++ #4

저번에 이어 ITCThread에 좀 더 기능을 추가해 보았습니다.

1. 기존 Legacy class를 ITCThread로 작동하게 만들어 주는 template 클래스를 추가했습니다.

template <typename Adaptee>
struct ITCAdaptor : public Adaptee, public ITCThread<Adaptee>
{
    ITCAdaptor(const std::string& name): ITCThread<Adaptee>(name) {}
};

이걸 쓰실 때는 두가지 방법이 가능합니다. 한가지는 상속을 이용하는 방법이고, 다른 한가지는 상속 없이 바로 이용하는 방법입니다.

// 상속을 이용하는 방법
struct Hello
{
    void sayHelloTo(const string name)
    {
        cout << "Hello, " << name << "!" << endl;
    }
};

struct HelloITCAdaptor : public ITCAdaptor<Hello>
{
    HelloITCAdaptor(const string& name) : ITCAdaptor<Hello>(name) {}

    void sayHelloTo(const string& name)
    {
        doIt(&Hello::sayHelloTo, name);
    }
};

s = "hello";
HelloITCAdaptor hello(s);
hello.sayHelloTo("Yoonsoo Kim");
hello.quit();
hello.join();

// 바로 이용하는 방법
struct ButtonHandler
{
    void onClick(int x, int y)
    {
        cout << "button clicked at (" << x << ", "
             << y << ")" << endl;

    }
};

typedef ITCAdaptor<ButtonHandler> ButtonHandlerITCAdaptor;

s = "btn_handler";
ButtonHandlerITCAdaptor btnHdlr(s);
btnHdlr.doIt(&ButtonHandler::onClick, 10, 20);
btnHdlr.quit();
btnHdlr.join();

2. 일반 함수를 특정 thread context에서 실행할 수 있게 해주는 Adaptor 클래스를 추가했습니다.

struct FunctionITCAdaptor : public ITCThread<FunctionITCAdaptor>
{
    FunctionITCAdaptor(const std::string& name):
        ITCThread<FunctionITCAdaptor>(name) {}

};

이 클래스를 사용하는 방법도 두 가지 모두 가능합니다.

예를 들어 기존에 다음과 같은 함수가 있었다고 가정하겠습니다.

void printHello()
{
    cout << "Hello, world" << endl;
}

void sayHelloTo(const string name)
{
    cout << "Hello, " << name << "!" << endl;
}

void sayWelcomeToAndGoodByeTo(const string name1, const string name2)
{
    cout << "Welcome, " << name1 << "!" << endl;
    cout << "Good-bye, " << name2 << "!" << endl;
}

FunctionITCAdaptor를 바로 사용하려면 다음과 같이 하시면 됩니다.

s = "fthr";
FunctionITCAdaptor fthr(s);
fthr.doIt(printHello);
fthr.doIt(sayHelloTo, "KKK");
fthr.doIt(sayWelcomeToAndGoodByeTo, "KKK", "YYY");
fthr.quit();
fthr.join();

FunctionITCAdaptor에서 상속을 받은 후 wrapping member function을 작성하는 방식으로 하셔도 됩니다.

// private 상속을 사용해 봤습니다
struct Greeter : private FunctionITCAdaptor
{
    Greeter(const string& name) : FunctionITCAdaptor(name) {}

    // private 상속을 했으므로 선별적으로 공개할 멤버 함수를 명시해야 합니다.
    // using declaration을 사용했습니다
    using FunctionITCAdaptor::quit;
    using FunctionITCAdaptor::join;

    void printHello()
    {
        doIt(::printHello);
    }

    void sayHelloTo(const string& name)
    {
        doIt(::sayHelloTo, name);
    }

    void sayWelcomeToAndGoodByeTo(const string& name1,
                                  const string& name2)

    {
        doIt(::sayWelcomeToAndGoodByeTo, name1, name2);
    }
};

s = "GRTR";
Greeter g(s);
g.printHello();
g.sayHelloTo("YYY");
g.sayWelcomeToAndGoodByeTo("YYY", "KKK");
g.quit();
g.join();

3. 마지막으로 ITCBusyThreadBase라는 클래스를 추가했습니다. 이 클래스는 ITCThreadBase와 달리 나름대로 처리해야 하는 로직이 있어서 그 로직을 지속적으로 처리하는 와중에 중간 중간에 ITC call을 한 번씩 처리하도록 설계되어 있습니다. 다시 말하자면 바쁘게 처리해야할 일이 따로 있고, 가끔 ITC call을 처리하는 방식입니다. 그래서 이름도 ITCBusyThreadBase라고 지었습니다. ITCBusyThreadBase는 다음과 같이 선언되어 있습니다.

namespace itc { namespace details {

class ITCBusyThreadBase : public boost::thread {
public:
    typedef boost::function<void (void)> CallType;

    ITCBusyThreadBase(const std::string& name = "thr");

    const std::string& getName()
    {
        return m_name;
    }

    void quit(int code)
    {
        doItPrivate(&ITCBusyThreadBase::doQuit, code);
    }

    void quit()
    {
        doItPrivate(&ITCBusyThreadBase::doQuit);
    }

    void doIt(CallType call);

private:
    void doItPrivate(void (ITCBusyThreadBase::*mf)(void))
    {
        doIt(CallType(boost::bind(mf, this)));
    }

    void doItPrivate(void (ITCBusyThreadBase::*mf)(int), int arg)
    {
        doIt(CallType(boost::bind(mf, this, arg)));
    }

    void processCalls();
    virtual void process() = 0;
    void run();
    void doQuit(int code);
    void doQuit();

    std::string m_name;
    boost::mutex m_mutex;
    std::queue<CallType>  m_callQ;
    bool m_run;
};

};

보시다시피 대부분은 ITCThreadBase와 동일하지만, procesCalls() 라는 내부 멤버 함수가 추가되었고, process() 멤버 함수가 순수가상함수로 선언되어 있습니다. 그리고 멤버 변수를 보면 condition_variable은 사용하지 않는다는 것을 확인할 수 있습니다. process()는 아까 설명에서와 같이 ITCBusyThreadBase 가 본래 처리해야할 로직입니다. process() 안에서 계속 무한 루프를 돌거나 너무 오래 걸리면 ITC call 처리를 전혀 하지 못하거나 처리가 늦어지게 되므로 주의해야 합니다. 다음은 ITCBusyThreadBase 구현입니다.

가장 핵심적인 doIt(), processCalls(), run() 이 세 가지만 나타내 보았습니다.

// condition_variable을 사용하지 않고 있습니다. ITCBusyThreadBase는 ITC call이 올 때까지
// 기다리는 것이 아니라 자발적으로 중간 중간에 ITC call을 체크하는 방식으로
// 작동하기 때문입니다
void
ITCBusyThreadBase::doIt(CallType call)
{
    unique_lock<mutex> lock(m_mutex);
    m_callQ.push(call);
}

// ITC call이 있는지 체크한 후 있으면 하나만 처리합니다.
// 여러개를 처리하다보면 본래 처리해야 하는 로직이 늦어질 수 있으므로
// 하나만 처리합니다
void
ITCBusyThreadBase::processCalls()
{
    CallType call;
    bool req = false;
  
    {
        unique_lock<mutex> lock(m_mutex);
        if (!m_callQ.empty())
        {
            call = m_callQ.front();
            m_callQ.pop();
            req = true;
        }
    }
  
    if (req)
        call();
}

// process() 안에서 ITCBusyThreadBase 클래스 사용자가 정의한 로직이
// 수행됩니다. process() 안에서 너무 오래 머물게 되면 processCalls() 호출이
// 늦어지므로 ITC call 처리가 늦어지게 됩니다.
void
ITCBusyThreadBase::run()
{
    cout << m_name << ": starting..." << endl;
    while (m_run)
    {
        process();
        processCalls();
    }
}

ITCBusyThreadBase를 사용한 예제는 다음과 같습니다.

// ITCBusyThreadBase에서   상속받은 클래스를 정의하고, process()를 구현하고,
// 필요에 따라 공개하고자 하는 멤버함수를 추가합니다.
class Controller : private ITCBusyThreadBase
{
public:
    Controller(const string& name) : ITCBusyThreadBase(name) {}

    using ITCBusyThreadBase::quit;
    using ITCBusyThreadBase::join;

    void setPosition(const string& name, double pos)
    {
        cout << getName() << ": queueing setPosition() request" << endl;
        doIt(CallType(bind(&Controller::doSetPosition,
                           this, name, pos)));

    }

private:
    void doSetPosition(const string name, double pos)
    {
        cout << getName() << ": setting " << name
             << "'s position to " << pos << endl;
    }

    virtual void process()
    {
        static int cnt = 0;
        if (cnt++ % 100 == 0)
        {
            cout << getName() << ": processing..." << endl;
        }
        usleep(100000);
    }
};

string name;
double pos;
Controller ctrl("CTRL");
  
do
{
    cout << "enter name and position: ";
    cin >> name >> pos;
    cout << "setting " << name << "'s position to " << pos << endl;
    ctrl.setPosition(name, pos);
    usleep(10000);
} while(cin);

for (int i = 0; i < 100; ++i)
{
    ctrl.setPosition(name, pos);
    usleep(10000);
}
ctrl.quit();
ctrl.join();