ØMQ는 자체 메시징 인프라를 구축 할 수있는 깔끔한 메시징 라이브러리다. 소켓보다는 수준 높은 개념이며 메시징 시스템을 구축하기 위해 가능한 한 낮은 수준의 개념 프레임워크를 제공한다.
전용 메시지 브로커없이 로드밸런싱을 지원하며 메시지 큐를 제공한다. 라이브러리의 API는 버클리 소켓의 API와 유사하도록 설계되었다.
무엇보다 ØMQ의 가장 큰 장점은 크로스 플랫폼으로 동작할 수 있다는 점일 것이다.
오픈소스 라이센스로는 LGPLv3로 사용할 수 있으며 다양한 프로그래밍 언어 바인딩이 존재한다.
C++로 사용할 수 있는 바인딩에는 다음과 같은 것들이 있다.
ZeroMQ에는 메시징 패턴이 있는데 상호연결된 시스템 간의 통신 흐름을 설명하는 네트워크 지향 아키텍처 패턴이다. ZeroMQ는 이러한 패턴을 활용할 수 있도록 사전 최적화된 소켓을 제공한다. 어떤 시스템이 서로 연결될 수 있는지, 그리고 그들 사이의 의사소통의 흐름은 무엇인지를 나타낸다.
ZeroMQ 패턴을 이해하려면 소켓 유형과 그 작동 방식을 이해해야한다. 기본으로 제공하는 핵심적인 ØMQ 패턴은 다음과 같다 :
Request-reply : 클라이언트와 서비스의 집합을 연결하는 패턴.
Publish-subscribe : publisher와 subscriber 집합을 연결하는 패턴.
Pipeline : Push/Pull 소켓 쌍으로 단방향 통신에 사용된다.
다음은 ZeroMQ로 사용가능한 연결 쌍(양쪽이 바인딩 할 수 있음)으로 유효한 소켓조합이다 :
PUB and SUB
REQ and REP
REQ and ROUTER
DEALER and REP
DEALER and ROUTER
DEALER and DEALER
ROUTER and ROUTER
PUSH and PULL
PAIR and PAIR
REQ-REP
REQ-REP 패턴은 클라이언트/서버 모델이며 가장 기본적인 패턴에 해당한다. 클라이언트는 요청을 보내고 서버는 요청에 응답한다.
클라이언트는 한 반복문 내에서 zmq_send()을 호출한 뒤에 zmq_recv()를 호출한다 (혹은 필요하다면 한번만). 다른 시퀀스를 수행하는 것 (예를 들어 한번에 두 개의 메시지를 보내는 것) 은 send 혹은 recv 호출로부터 -1의 반환 코드를 얻게될 것이다.
//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
int main () {
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// Wait for next request from client
socket.recv (&request);
std::cout << "Received Hello" << std::endl;
// Do some 'work'
sleep (1);
// Send reply back to client
zmq::message_t reply (5);
memcpy ((void *) reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}
PUB-SUB
두 번째 고전적인 패턴은 서버가 클라이언트들에게 정보를 PUSH하는 단방향 데이터 배포다.
이 패턴을 이용하여 우편 번호, 온도 및 상대 습도의 날씨 정보를 업데이트하는 것이 아래 예제이다.
서버
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#if (defined (WIN32))
#include <zhelpers.hpp>
#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
int main () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
int zipcode, temperature, relhumidity;
// Get values that will fool the boss
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message);
}
return 0;
}
클라이언트
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>
int main (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to talk to server
std::cout << "Collecting updates from weather server…\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
zmq::message_t update;
int zipcode, temperature, relhumidity;
subscriber.recv(&update);
std::istringstream iss(static_cast<char*>(update.data()));
iss >> zipcode >> temperature >> relhumidity ;
total_temp += temperature;
}
std::cout << "Average temperature for zipcode '"<< filter
<<"' was "<<(int) (total_temp / update_nbr) <<"F"
<< std::endl;
return 0;
}
SUB 소켓을 사용할 때 예제 코드에서와 같이 zmq_setsockopt() 및 SUBSCRIBE를 사용하여 토픽에 대한 구독을 설정해야 한다. 구독을 설정하지 않으면 메시지가 표시되지 않는다.
PUB-SUB 소켓 쌍은 비동기로 작동한다. 클라이언트는 메인 루프에서 zmq_recv()를 수행한다(또는 필요한 경우 한 번). SUB 소켓에 메시지를 보내려고하면 오류가 발생한다. 마찬가지로, 서비스는 필요한만큼 zmq_send()를 수행할 수 있지만 PUB 소켓에서 zmq_recv()를 수행해서는 안된다.
PUB-SUB 소켓에 대해 알아야 할 또 하나의 중요한 사항이 있다. 가입자가 언제 메시지를 받기 시작하는지 정확히 알지 못한다. 구독자를 시작하고 잠시 기다렸다가 게시자를 시작하더라도 구독자는 항상 게시자가 보내는 첫 번째 메시지를 놓친다는 것이다. Subscriber가 Publisher에 연결함에 따라 (작지만 0이 아닌 시간이 걸리므로)그동안 게시자가 이미 메시지를 보내고있을 수 있기 때문이다.
추후 Subscriber가 연결하고 준비되기까지 데이터를 발송하지 않도록 Publisher와 Subscriber를 동기화 하는 방법에 대해서 알아본다.
PUSH-PULL
푸시 앤 풀 소켓을 사용하면 파이프 라인으로 정렬 된 여러 작업자에게 메시지를 배포 할 수 있다. 푸시 소켓은 전송 된 메시지를 풀 클라이언트에 균등하게 배포한다. 이것은 Producer/Consumer 모델과 동일하지만 소비자가 계산한 결과는 업스트림이 아니라 다른 pull/소비자 소켓으로 다운스트림 전송된다. 파이프 라인 단계가 여러 노드에 연결되면 연결된 모든 노드간에 데이터가 로드밸런싱된다.
아래 주소 페이지를 방문하면 다양한 고급 REQ-REP 패턴 조합을 학습할 수 있다.