한국어
오픈소스포럼
 이곳은 다양한 오픈소스 프로젝트를 소개하고 리뷰, 활용 방법을 공유합니다.

ZeroMQ 비동기 클라이언트/서버 패턴

makersweb 2020.08.13 13:21 조회 수 : 2668

ZeroMQ 비동기 클라이언트와 서버 패턴에 대한 내용으로 ZeroMQ 공식 문서를 기반으로 필요한 내용만 정리한 것이다.

 

이 전 글에서 ZeroMQ의 기본적인 패턴을 소개하였다. REQ-REP 패턴 위에 ROUTER와 DEALER 를 조합하는 고급 패턴을 소개한다.

 

ROUTER-to-DEALER 프록시를 사용하면 REQ-REP 조합을 확장하여 완전한 비동기 클라이언트 및 서버 구조를 설계할 수 있다.

 

REQ와 DEALER의 두 가지 구체적인 차이점을 먼저 보자. 

  • REQ 소켓은 항상 데이터 프레임 앞에 빈(Empty) 구분자 프레임을 보낸다. 딜러는 그렇지 않다.
  • REQ 소켓은 응답을 받기 전에 하나의 메시지만 보낸다. 응답을 받을 때까지 스레드는 차단된다. 이와 반대로 DEALER는 완전히 비동기적이다.

 

다은은 기본적인 ROUTER to DEALER 패턴 C++ 예제 소스코드이다.

//
//  Custom routing Router to Dealer
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"
#include <pthread.h>

static void *
worker_task(void *args)
{
    zmq::context_t context(1);
    zmq::socket_t worker(context, ZMQ_DEALER);

#if (defined (WIN32))
    s_set_id(worker, (intptr_t)args);
#else
    s_set_id(worker);          //  Set a printable identity
#endif

    worker.connect("tcp://localhost:5671");

    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_sendmore(worker, "");
        s_send(worker, "Hi Boss");

        //  Get workload from broker, until finished
        s_recv(worker);     //  Envelope delimiter
        std::string workload = s_recv(worker);
        if ("Fired!" == workload) {
            std::cout << "Completed: " << total << " tasks" << std::endl;
            break;
        }
        total++;

        //  Do some random work
        s_sleep(within(500) + 1);
    }

    return NULL;
}

//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. Each thread has its own
//  context and conceptually acts as a separate process.
int main() {
    zmq::context_t context(1);
    zmq::socket_t broker(context, ZMQ_ROUTER);

    broker.bind("tcp://*:5671");
    srandom((unsigned)time(NULL));

    const int NBR_WORKERS = 10;
    pthread_t workers[NBR_WORKERS];
    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        pthread_create(workers + worker_nbr, NULL, worker_task, (void *)(intptr_t)worker_nbr);
    }

    //  Run for five seconds and then tell workers to end
    int64_t end_time = s_clock() + 5000;
    int workers_fired = 0;
    while (1) {
        //  Next message gives us least recently used worker
        std::string identity = s_recv(broker);
        {
            s_recv(broker);     //  Envelope delimiter
            s_recv(broker);     //  Response from worker
        }

        s_sendmore(broker, identity);
        s_sendmore(broker, "");

        //  Encourage workers until it's time to fire them
        if (s_clock() < end_time)
            s_send(broker, "Work harder");
        else {
            s_send(broker, "Fired!");
            if (++workers_fired == NBR_WORKERS)
                break;
        }
    }

    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        pthread_join(workers[worker_nbr], NULL);
    }

    return 0;
}

 

ROUTER to DEALER 의 패턴으로 한 서버가 여러 작업자와 비동기식으로 대화하는 1 대 N 사용 사례를 다양한 클라이언트가 단일 서버와 통신하고 이를 비동기적으로 수행하는 N 대 1 아키텍처로 뒤집을 수 있다.

fig37.png

작동 방식은 다음과 같다.

  • 클라이언트는 서버에 연결하고 요청을 보낸다.
  • 각 요청에 대해 서버는 0 개 이상의 응답을 보낸다.
  • 클라이언트는 응답을 기다리지 않고 여러 요청을 보낼 수 있다.
  • 서버는 새로운 요청을 기다리지 않고 여러 개의 회신을 보낼 수 있다.

 

다음은 비동기 client/server in C++ 예제 소스코드이다.

//  Asynchronous client-to-server (DEALER to ROUTER)
//
//  While this example runs in a single process, that is to make
//  it easier to start and stop the example. Each task has its own
//  context and conceptually acts as a separate process.

#include <vector>
#include <thread>
#include <memory>
#include <functional>

#include <zmq.hpp>
#include "zhelpers.hpp" 

//  This is our client task class.
//  It connects to the server, and then sends a request once per second
//  It collects responses as they arrive, and it prints them out. We will
//  run several client tasks in parallel, each with a different random ID.
//  Attention! -- this random work well only on linux.

class client_task {
public:
    client_task()
        : ctx_(1),
          client_socket_(ctx_, ZMQ_DEALER)
    {}

    void start() {
        // generate random identity
        char identity[10] = {};
        sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
        printf("%s\n", identity);
        client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
        client_socket_.connect("tcp://localhost:5570");

        zmq::pollitem_t items[] = {
            { static_cast<void*>(client_socket_), 0, ZMQ_POLLIN, 0 } };
        int request_nbr = 0;
        try {
            while (true) {
                for (int i = 0; i < 100; ++i) {
                    // 10 milliseconds
                    zmq::poll(items, 1, 10);
                    if (items[0].revents & ZMQ_POLLIN) {
                        printf("\n%s ", identity);
                        s_dump(client_socket_);
                    }
                }
                char request_string[16] = {};
                sprintf(request_string, "request #%d", ++request_nbr);
                client_socket_.send(request_string, strlen(request_string));
            }
        }
        catch (std::exception &e) {}
    }

private:
    zmq::context_t ctx_;
    zmq::socket_t client_socket_;
};

//  Each worker task works on one request at a time and sends a random number
//  of replies back, with random delays between replies:

class server_worker {
public:
    server_worker(zmq::context_t &ctx, int sock_type)
        : ctx_(ctx),
          worker_(ctx_, sock_type)
    {}

    void work() {
            worker_.connect("inproc://backend");

        try {
            while (true) {
                zmq::message_t identity;
                zmq::message_t msg;
                zmq::message_t copied_id;
                zmq::message_t copied_msg;
                worker_.recv(&identity);
                worker_.recv(&msg);

                int replies = within(5);
                for (int reply = 0; reply < replies; ++reply) {
                    s_sleep(within(1000) + 1);
                    copied_id.copy(&identity);
                    copied_msg.copy(&msg);
                    worker_.send(copied_id, ZMQ_SNDMORE);
                    worker_.send(copied_msg);
                }
            }
        }
        catch (std::exception &e) {}
    }

private:
    zmq::context_t &ctx_;
    zmq::socket_t worker_;
};

//  This is our server task.
//  It uses the multithreaded server model to deal requests out to a pool
//  of workers and route replies back to clients. One worker can handle
//  one request at a time but one client can talk to multiple workers at
//  once.

class server_task {
public:
    server_task()
        : ctx_(1),
          frontend_(ctx_, ZMQ_ROUTER),
          backend_(ctx_, ZMQ_DEALER)
    {}

    enum { kMaxThread = 5 };

    void run() {
        frontend_.bind("tcp://*:5570");
        backend_.bind("inproc://backend");

        std::vector<server_worker *> worker;
        std::vector<std::thread *> worker_thread;
        for (int i = 0; i < kMaxThread; ++i) {
            worker.push_back(new server_worker(ctx_, ZMQ_DEALER));

            worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker)));
            worker_thread->detach();
        }

        try {
            zmq::proxy(static_cast<void*>(frontend_),
                       static_cast<void*>(backend_),
                       nullptr);
        }
        catch (std::exception &e) {}

        for (int i = 0; i < kMaxThread; ++i) {
            delete worker;
            delete worker_thread;
        }
    }

private:
    zmq::context_t ctx_;
    zmq::socket_t frontend_;
    zmq::socket_t backend_;
};

//  The main thread simply starts several clients and a server, and then
//  waits for the server to finish.

int main (void)
{
    client_task ct1;
    client_task ct2;
    client_task ct3;
    server_task st;

    std::thread t1(std::bind(&client_task::start, &ct1));
    std::thread t2(std::bind(&client_task::start, &ct2));
    std::thread t3(std::bind(&client_task::start, &ct3));
    std::thread t4(std::bind(&server_task::run, &st));

    t1.detach();
    t2.detach();
    t3.detach();
    t4.detach();

    getchar();
    return 0;
}

 

예제에서 서버는 워커 스레드를 만들고 스레드는 하나의 요청을 동기적으로 처리한다. 내부 대기열을 사용하여 이를 프론트 엔드 소켓에 연결한다. zmq_proxy() 함수를 호출하여 프론트엔드 및 백엔드 소켓을 연결한다.

fig38.png

클라이언트와 서버 사이에서 DEALER to ROUTER 패턴으로 통신하고 있지만 서버 내부의 메인 스레드에서는 DEALER to DEALER 를 사용하고 있다. 워커가 엄격하게 동기화 되어야할 필요가 있는 경우 REP 를 사용할 수 있지만 여러 개의 답장을 비동기로 보내려면 DEALER 소켓이 필요하다. 다음은 이것을 수평적으로 나타낸 것이다.

zmq_.png

 

클라이언트는 단일 프레임으로 구성된 메시지를 보낸다. 서버 스레드는 2 프레임 메시지 (클라이언트 ID가 접두어로 붙은 원본 메시지)를 받는다. 그런 다음 워커는 첫 번째 프레임을 ID로 사용하여 두 번째 프레임에 데이터를 넣고 클라이언트에 다시 보낼 수 있다. ROUTER는 메세지를 해당 클라이언트로 라우팅 한다.