한국어
오픈소스포럼
 

ZeroMQ의 기본 메세지 패턴들

makersweb 2020.07.31 11:37 조회 수 : 8

ØMQ는 자체 메시징 인프라를 구축 할 수있는 깔끔한 메시징 라이브러리다. 소켓보다 수준 높은 개념이며 메시징 시스템을 구축하기 위해 가능한 한 낮은 수준의 개념 프레임워크를 제공한다.

한 가지 작업을 잘 수행하는 다양한 응용프로그램에서 서비스를 처리할 수 있는 프레임워크를 구축하는 데 도움이 될 수 있다.

 

ZeroMQ 메시징 패턴은 상호연결된 시스템 간의 통신 흐름을 설명하는 네트워크 지향 아키텍처 패턴이다. ZeroMQ는 이러한 패턴을 활용할 수 있도록 사전 최적화된 소켓을 제공한다. 

어떤 시스템이 서로 연결될 수 있는지, 그리고 그들 사이의 의사소통의 흐름은 무엇인지. 다시 말해, ZeroMQ 패턴을 이해하려면 소켓 유형과 그 작동 방식을 이해해야한다.

 

기본으로 제공하는 핵심적인 ØMQ 패턴은 다음과 같다 :

Request-reply : 클라이언트와 서비스의 집합을 연결하는 패턴.

Publish-subscribe : publisher와 subscriber 집합을 연결하는 패턴.

Pipeline : Push/Pull 소켓 쌍으로 단방향 통신에 사용된다.

 

다음은 ZeroMQ로 사용가능한 연결 쌍(양쪽이 바인딩 할 수 있음)으로 유효한 소켓조합이다 :

PUB and SUB

REQ and REP

REQ and ROUTER

DEALER and REP

DEALER and ROUTER

DEALER and DEALER

ROUTER and ROUTER

PUSH and PULL

PAIR and PAIR

 

REQ-REP

REQ-REP 패턴은 클라이언트/서버 모델이며 가장 기본적인 패턴에 해당한다. 클라이언트는 요청을 보내고 서버는 요청에 응답한다.

reqrep.png

 

클라이언트는 한 반복문 내에서 zmq_send()을 호출한 뒤에 zmq_recv()를 호출한다 (혹은 필요하다면 한번만). 다른 시퀀스를 수행하는 것 (예를 들어 한번에 두 개의 메시지를 보내는 것) 은 send 혹은 recv 호출로부터 -1의 반환 코드를 얻게될 것이다.

//
//  Hello World server in C++
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

int main () {
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy ((void *) reply.data (), "World", 5);
        socket.send (reply);
    }
    return 0;
}

 

 

PUB-SUB

두 번째 고전적인 패턴은 서버가 클라이언트들에게 정보를 PUSH하는 단방향 데이터 배포다. 

pubsub.png

 

이 패턴을 이용하여 우편 번호, 온도 및 상대 습도의 날씨 정보를 업데이트하는 것이 아래 예제이다.

 

서버

//
//  Weather update server in C++
//  Binds PUB socket to tcp://*:5556
//  Publishes random weather updates
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#if (defined (WIN32))
#include <zhelpers.hpp>
#endif

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main () {

    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    publisher.bind("ipc://weather.ipc");                // Not usable on Windows.

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {

        int zipcode, temperature, relhumidity;

        //  Get values that will fool the boss
        zipcode     = within (100000);
        temperature = within (215) - 80;
        relhumidity = within (50) + 10;


        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
            "%05d %d %d", zipcode, temperature, relhumidity);
        publisher.send(message);
    }
    return 0;
}

 

클라이언트

//
//  Weather update client in C++
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    std::cout << "Collecting updates from weather server…\n" << std::endl;
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");

    //  Subscribe to zipcode, default is NYC, 10001
    const char *filter = (argc > 1)? argv [1]: "10001 ";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));


    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {

        zmq::message_t update;
        int zipcode, temperature, relhumidity;

        subscriber.recv(&update);

        std::istringstream iss(static_cast<char*>(update.data()));
        iss >> zipcode >> temperature >> relhumidity ;

        total_temp += temperature;
    }
    std::cout     << "Average temperature for zipcode '"<< filter
                <<"' was "<<(int) (total_temp / update_nbr) <<"F"
                << std::endl;
    return 0;
}

 

SUB 소켓을 사용할 때 예제 코드에서와 같이 zmq_setsockopt() 및 SUBSCRIBE를 사용하여 토픽에 대한 구독을 설정해야 한다. 구독을 설정하지 않으면 메시지가 표시되지 않는다.

 

PUB-SUB 소켓 쌍은 비동기로 작동한다. 클라이언트는 메인 루프에서 zmq_recv()를 수행한다(또는 필요한 경우 한 번). SUB 소켓에 메시지를 보내려고하면 오류가 발생한다. 마찬가지로, 서비스는 필요한만큼 zmq_send()를 수행할 수 있지만 PUB 소켓에서 zmq_recv()를 수행해서는 안된다.

 

PUB-SUB 소켓에 대해 알아야 할 또 하나의 중요한 사항이 있다. 가입자가 언제 메시지를 받기 시작하는지 정확히 알지 못한다. 구독자를 시작하고 잠시 기다렸다가 게시자를 시작하더라도 구독자는 항상 게시자가 보내는 첫 번째 메시지를 놓친다는 것이다. Subscriber가 Publisher에 연결함에 따라 (작지만 0이 아닌 시간이 걸리므로)그동안 게시자가 이미 메시지를 보내고있을 수 있기 때문이다.

 

추후 Subscriber가 연결하고 준비되기까지 데이터를 발송하지 않도록 Publisher와 Subscriber를 동기화 하는 방법에 대해서 알아본다.

 

PUSH-PULL

푸시 앤 풀 소켓을 사용하면 파이프 라인으로 정렬 된 여러 작업자에게 메시지를 배포 할 수 있다. 푸시 소켓은 전송 된 메시지를 풀 클라이언트에 균등하게 배포한다. 이것은 Producer/Consumer 모델과 동일하지만 소비자가 계산한 결과는 업스트림이 아니라 다른 pull/소비자 소켓으로 다운스트림 전송된다. 파이프 라인 단계가 여러 노드에 연결되면 연결된 모든 노드간에 데이터가 로드밸런싱된다.

pushpull.png

 

아래 주소 페이지를 방문하면 다양한 고급 REQ-REP 패턴 조합을 학습할 수 있다.

http://zguide.zeromq.org/page:chapter3