이곳은 다양한 오픈소스 프로젝트를 소개하고 리뷰, 활용 방법을 공유합니다.
조회 수 3962 추천 수 0 댓글 0
?

단축키

Prev이전 문서

Next다음 문서

크게 작게 위로 아래로 댓글로 가기 인쇄 첨부
?

단축키

Prev이전 문서

Next다음 문서

크게 작게 위로 아래로 댓글로 가기 인쇄 첨부

ZeroMQ 비동기 클라이언트와 서버 패턴에 대한 내용으로 ZeroMQ 공식 문서를 기반으로 필요한 내용만 정리한 것이다.

 

이 전 글에서 ZeroMQ의 기본적인 패턴을 소개하였다. REQ-REP 패턴 위에 ROUTER와 DEALER 를 조합하는 고급 패턴을 소개한다.

 

ROUTER-to-DEALER 프록시를 사용하면 REQ-REP 조합을 확장하여 완전한 비동기 클라이언트 및 서버 구조를 설계할 수 있다.

 

REQ와 DEALER의 두 가지 구체적인 차이점을 먼저 보자. 

  • REQ 소켓은 항상 데이터 프레임 앞에 빈(Empty) 구분자 프레임을 보낸다. 딜러는 그렇지 않다.
  • REQ 소켓은 응답을 받기 전에 하나의 메시지만 보낸다. 응답을 받을 때까지 스레드는 차단된다. 이와 반대로 DEALER는 완전히 비동기적이다.

 

다은은 기본적인 ROUTER to DEALER 패턴 C++ 예제 소스코드이다.

//
//  Custom routing Router to Dealer
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"
#include <pthread.h>

static void *
worker_task(void *args)
{
    zmq::context_t context(1);
    zmq::socket_t worker(context, ZMQ_DEALER);

#if (defined (WIN32))
    s_set_id(worker, (intptr_t)args);
#else
    s_set_id(worker);          //  Set a printable identity
#endif

    worker.connect("tcp://localhost:5671");

    int total = 0;
    while (1) {
        //  Tell the broker we're ready for work
        s_sendmore(worker, "");
        s_send(worker, "Hi Boss");

        //  Get workload from broker, until finished
        s_recv(worker);     //  Envelope delimiter
        std::string workload = s_recv(worker);
        if ("Fired!" == workload) {
            std::cout << "Completed: " << total << " tasks" << std::endl;
            break;
        }
        total++;

        //  Do some random work
        s_sleep(within(500) + 1);
    }

    return NULL;
}

//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. Each thread has its own
//  context and conceptually acts as a separate process.
int main() {
    zmq::context_t context(1);
    zmq::socket_t broker(context, ZMQ_ROUTER);

    broker.bind("tcp://*:5671");
    srandom((unsigned)time(NULL));

    const int NBR_WORKERS = 10;
    pthread_t workers[NBR_WORKERS];
    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        pthread_create(workers + worker_nbr, NULL, worker_task, (void *)(intptr_t)worker_nbr);
    }

    //  Run for five seconds and then tell workers to end
    int64_t end_time = s_clock() + 5000;
    int workers_fired = 0;
    while (1) {
        //  Next message gives us least recently used worker
        std::string identity = s_recv(broker);
        {
            s_recv(broker);     //  Envelope delimiter
            s_recv(broker);     //  Response from worker
        }

        s_sendmore(broker, identity);
        s_sendmore(broker, "");

        //  Encourage workers until it's time to fire them
        if (s_clock() < end_time)
            s_send(broker, "Work harder");
        else {
            s_send(broker, "Fired!");
            if (++workers_fired == NBR_WORKERS)
                break;
        }
    }

    for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) {
        pthread_join(workers[worker_nbr], NULL);
    }

    return 0;
}

 

ROUTER to DEALER 의 패턴으로 한 서버가 여러 작업자와 비동기식으로 대화하는 1 대 N 사용 사례를 다양한 클라이언트가 단일 서버와 통신하고 이를 비동기적으로 수행하는 N 대 1 아키텍처로 뒤집을 수 있다.

fig37.png

작동 방식은 다음과 같다.

  • 클라이언트는 서버에 연결하고 요청을 보낸다.
  • 각 요청에 대해 서버는 0 개 이상의 응답을 보낸다.
  • 클라이언트는 응답을 기다리지 않고 여러 요청을 보낼 수 있다.
  • 서버는 새로운 요청을 기다리지 않고 여러 개의 회신을 보낼 수 있다.

 

다음은 비동기 client/server in C++ 예제 소스코드이다.

//  Asynchronous client-to-server (DEALER to ROUTER)
//
//  While this example runs in a single process, that is to make
//  it easier to start and stop the example. Each task has its own
//  context and conceptually acts as a separate process.

#include <vector>
#include <thread>
#include <memory>
#include <functional>

#include <zmq.hpp>
#include "zhelpers.hpp" 

//  This is our client task class.
//  It connects to the server, and then sends a request once per second
//  It collects responses as they arrive, and it prints them out. We will
//  run several client tasks in parallel, each with a different random ID.
//  Attention! -- this random work well only on linux.

class client_task {
public:
    client_task()
        : ctx_(1),
          client_socket_(ctx_, ZMQ_DEALER)
    {}

    void start() {
        // generate random identity
        char identity[10] = {};
        sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
        printf("%s\n", identity);
        client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
        client_socket_.connect("tcp://localhost:5570");

        zmq::pollitem_t items[] = {
            { static_cast<void*>(client_socket_), 0, ZMQ_POLLIN, 0 } };
        int request_nbr = 0;
        try {
            while (true) {
                for (int i = 0; i < 100; ++i) {
                    // 10 milliseconds
                    zmq::poll(items, 1, 10);
                    if (items[0].revents & ZMQ_POLLIN) {
                        printf("\n%s ", identity);
                        s_dump(client_socket_);
                    }
                }
                char request_string[16] = {};
                sprintf(request_string, "request #%d", ++request_nbr);
                client_socket_.send(request_string, strlen(request_string));
            }
        }
        catch (std::exception &e) {}
    }

private:
    zmq::context_t ctx_;
    zmq::socket_t client_socket_;
};

//  Each worker task works on one request at a time and sends a random number
//  of replies back, with random delays between replies:

class server_worker {
public:
    server_worker(zmq::context_t &ctx, int sock_type)
        : ctx_(ctx),
          worker_(ctx_, sock_type)
    {}

    void work() {
            worker_.connect("inproc://backend");

        try {
            while (true) {
                zmq::message_t identity;
                zmq::message_t msg;
                zmq::message_t copied_id;
                zmq::message_t copied_msg;
                worker_.recv(&identity);
                worker_.recv(&msg);

                int replies = within(5);
                for (int reply = 0; reply < replies; ++reply) {
                    s_sleep(within(1000) + 1);
                    copied_id.copy(&identity);
                    copied_msg.copy(&msg);
                    worker_.send(copied_id, ZMQ_SNDMORE);
                    worker_.send(copied_msg);
                }
            }
        }
        catch (std::exception &e) {}
    }

private:
    zmq::context_t &ctx_;
    zmq::socket_t worker_;
};

//  This is our server task.
//  It uses the multithreaded server model to deal requests out to a pool
//  of workers and route replies back to clients. One worker can handle
//  one request at a time but one client can talk to multiple workers at
//  once.

class server_task {
public:
    server_task()
        : ctx_(1),
          frontend_(ctx_, ZMQ_ROUTER),
          backend_(ctx_, ZMQ_DEALER)
    {}

    enum { kMaxThread = 5 };

    void run() {
        frontend_.bind("tcp://*:5570");
        backend_.bind("inproc://backend");

        std::vector<server_worker *> worker;
        std::vector<std::thread *> worker_thread;
        for (int i = 0; i < kMaxThread; ++i) {
            worker.push_back(new server_worker(ctx_, ZMQ_DEALER));

            worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker)));
            worker_thread->detach();
        }

        try {
            zmq::proxy(static_cast<void*>(frontend_),
                       static_cast<void*>(backend_),
                       nullptr);
        }
        catch (std::exception &e) {}

        for (int i = 0; i < kMaxThread; ++i) {
            delete worker;
            delete worker_thread;
        }
    }

private:
    zmq::context_t ctx_;
    zmq::socket_t frontend_;
    zmq::socket_t backend_;
};

//  The main thread simply starts several clients and a server, and then
//  waits for the server to finish.

int main (void)
{
    client_task ct1;
    client_task ct2;
    client_task ct3;
    server_task st;

    std::thread t1(std::bind(&client_task::start, &ct1));
    std::thread t2(std::bind(&client_task::start, &ct2));
    std::thread t3(std::bind(&client_task::start, &ct3));
    std::thread t4(std::bind(&server_task::run, &st));

    t1.detach();
    t2.detach();
    t3.detach();
    t4.detach();

    getchar();
    return 0;
}

 

예제에서 서버는 워커 스레드를 만들고 스레드는 하나의 요청을 동기적으로 처리한다. 내부 대기열을 사용하여 이를 프론트 엔드 소켓에 연결한다. zmq_proxy() 함수를 호출하여 프론트엔드 및 백엔드 소켓을 연결한다.

fig38.png

클라이언트와 서버 사이에서 DEALER to ROUTER 패턴으로 통신하고 있지만 서버 내부의 메인 스레드에서는 DEALER to DEALER 를 사용하고 있다. 워커가 엄격하게 동기화 되어야할 필요가 있는 경우 REP 를 사용할 수 있지만 여러 개의 답장을 비동기로 보내려면 DEALER 소켓이 필요하다. 다음은 이것을 수평적으로 나타낸 것이다.

zmq_.png

 

클라이언트는 단일 프레임으로 구성된 메시지를 보낸다. 서버 스레드는 2 프레임 메시지 (클라이언트 ID가 접두어로 붙은 원본 메시지)를 받는다. 그런 다음 워커는 첫 번째 프레임을 ID로 사용하여 두 번째 프레임에 데이터를 넣고 클라이언트에 다시 보낼 수 있다. ROUTER는 메세지를 해당 클라이언트로 라우팅 한다.

TAG •

  1. 라즈베리파이4에서 openFrameworks 예제 실행

    Date2020.12.13 Bymakersweb Views2201
    Read More
  2. Dear ImGui, 경량의 C++ 용 GUI 및 Widget 라이브러리

    Date2020.11.28 Bymakersweb Views11885
    Read More
  3. GENIVI DLT(Diagnostic Log and Trace) 활용

    Date2020.11.19 Bymakersweb Views13491
    Read More
  4. 윈도우에서 안드로이드 flutter 프로그래밍 개발환경 구축(with Visual Studio Code)

    Date2020.09.16 Bymakersweb Views2873
    Read More
  5. 가볍고 쉬운 임베디드용 그래픽 라이브러리 - LVGL

    Date2020.09.16 Bymakersweb Views6006
    Read More
  6. Qt와 GStreamer 로 작성한 flac 오디오 재생 예제

    Date2020.09.05 Bymakersweb Views2563
    Read More
  7. ZeroMQ 비동기 클라이언트/서버 패턴

    Date2020.08.13 Bymakersweb Views3962
    Read More
  8. ZeroMQ의 기본 메세지 패턴들

    Date2020.07.31 Bymakersweb Views11124
    Read More
  9. [SDL2 와 OpenGL]윈도우 생성과 2D그래픽

    Date2020.04.11 Bymakersweb Views4701
    Read More
  10. Pluma(C++ Plug-in Management Framework) 튜토리얼

    Date2019.12.07 Bymakersweb Views15661
    Read More
  11. No Image

    도커(docker)설치 및 기본 명령어

    Date2019.12.02 Bymakersweb Views2193
    Read More
  12. webOS소개 및 Raspberry Pi 3 에서 실행

    Date2019.10.13 Bymakersweb Views5550
    Read More
  13. 리눅스에서 SDL2 최신버전 컴파일과 Qt Creator로 개발환경 구성

    Date2019.10.06 Bymakersweb Views4886
    Read More
  14. 텔레그램(Telegram) Bot 개발

    Date2019.07.21 Bymakersweb Views7721
    Read More
  15. No Image

    GDBus 튜토리얼(GDBus tutorial)

    Date2019.06.30 Bymakersweb Views13512
    Read More
  16. Wayland IVI Extension 간단 리뷰

    Date2019.05.12 Bymakersweb Views3799
    Read More
Board Pagination Prev 1 2 Next
/ 2