ZeroMQ 비동기 클라이언트와 서버 패턴에 대한 내용으로 ZeroMQ 공식 문서를 기반으로 필요한 내용만 정리한 것이다.
이 전 글에서 ZeroMQ의 기본적인 패턴을 소개하였다. REQ-REP 패턴 위에 ROUTER와 DEALER 를 조합하는 고급 패턴을 소개한다.
ROUTER-to-DEALER 프록시를 사용하면 REQ-REP 조합을 확장하여 완전한 비동기 클라이언트 및 서버 구조를 설계할 수 있다.
REQ와 DEALER의 두 가지 구체적인 차이점을 먼저 보자.
다은은 기본적인 ROUTER to DEALER 패턴 C++ 예제 소스코드이다.
// // Custom routing Router to Dealer // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> #include "zhelpers.hpp" #include <pthread.h> static void * worker_task(void *args) { zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_DEALER); #if (defined (WIN32)) s_set_id(worker, (intptr_t)args); #else s_set_id(worker); // Set a printable identity #endif worker.connect("tcp://localhost:5671"); int total = 0; while (1) { // Tell the broker we're ready for work s_sendmore(worker, ""); s_send(worker, "Hi Boss"); // Get workload from broker, until finished s_recv(worker); // Envelope delimiter std::string workload = s_recv(worker); if ("Fired!" == workload) { std::cout << "Completed: " << total << " tasks" << std::endl; break; } total++; // Do some random work s_sleep(within(500) + 1); } return NULL; } // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each thread has its own // context and conceptually acts as a separate process. int main() { zmq::context_t context(1); zmq::socket_t broker(context, ZMQ_ROUTER); broker.bind("tcp://*:5671"); srandom((unsigned)time(NULL)); const int NBR_WORKERS = 10; pthread_t workers[NBR_WORKERS]; for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) { pthread_create(workers + worker_nbr, NULL, worker_task, (void *)(intptr_t)worker_nbr); } // Run for five seconds and then tell workers to end int64_t end_time = s_clock() + 5000; int workers_fired = 0; while (1) { // Next message gives us least recently used worker std::string identity = s_recv(broker); { s_recv(broker); // Envelope delimiter s_recv(broker); // Response from worker } s_sendmore(broker, identity); s_sendmore(broker, ""); // Encourage workers until it's time to fire them if (s_clock() < end_time) s_send(broker, "Work harder"); else { s_send(broker, "Fired!"); if (++workers_fired == NBR_WORKERS) break; } } for (int worker_nbr = 0; worker_nbr < NBR_WORKERS; ++worker_nbr) { pthread_join(workers[worker_nbr], NULL); } return 0; }
ROUTER to DEALER 의 패턴으로 한 서버가 여러 작업자와 비동기식으로 대화하는 1 대 N 사용 사례를 다양한 클라이언트가 단일 서버와 통신하고 이를 비동기적으로 수행하는 N 대 1 아키텍처로 뒤집을 수 있다.
작동 방식은 다음과 같다.
다음은 비동기 client/server in C++ 예제 소스코드이다.
// Asynchronous client-to-server (DEALER to ROUTER) // // While this example runs in a single process, that is to make // it easier to start and stop the example. Each task has its own // context and conceptually acts as a separate process. #include <vector> #include <thread> #include <memory> #include <functional> #include <zmq.hpp> #include "zhelpers.hpp" // This is our client task class. // It connects to the server, and then sends a request once per second // It collects responses as they arrive, and it prints them out. We will // run several client tasks in parallel, each with a different random ID. // Attention! -- this random work well only on linux. class client_task { public: client_task() : ctx_(1), client_socket_(ctx_, ZMQ_DEALER) {} void start() { // generate random identity char identity[10] = {}; sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); printf("%s\n", identity); client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); client_socket_.connect("tcp://localhost:5570"); zmq::pollitem_t items[] = { { static_cast<void*>(client_socket_), 0, ZMQ_POLLIN, 0 } }; int request_nbr = 0; try { while (true) { for (int i = 0; i < 100; ++i) { // 10 milliseconds zmq::poll(items, 1, 10); if (items[0].revents & ZMQ_POLLIN) { printf("\n%s ", identity); s_dump(client_socket_); } } char request_string[16] = {}; sprintf(request_string, "request #%d", ++request_nbr); client_socket_.send(request_string, strlen(request_string)); } } catch (std::exception &e) {} } private: zmq::context_t ctx_; zmq::socket_t client_socket_; }; // Each worker task works on one request at a time and sends a random number // of replies back, with random delays between replies: class server_worker { public: server_worker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), worker_(ctx_, sock_type) {} void work() { worker_.connect("inproc://backend"); try { while (true) { zmq::message_t identity; zmq::message_t msg; zmq::message_t copied_id; zmq::message_t copied_msg; worker_.recv(&identity); worker_.recv(&msg); int replies = within(5); for (int reply = 0; reply < replies; ++reply) { s_sleep(within(1000) + 1); copied_id.copy(&identity); copied_msg.copy(&msg); worker_.send(copied_id, ZMQ_SNDMORE); worker_.send(copied_msg); } } } catch (std::exception &e) {} } private: zmq::context_t &ctx_; zmq::socket_t worker_; }; // This is our server task. // It uses the multithreaded server model to deal requests out to a pool // of workers and route replies back to clients. One worker can handle // one request at a time but one client can talk to multiple workers at // once. class server_task { public: server_task() : ctx_(1), frontend_(ctx_, ZMQ_ROUTER), backend_(ctx_, ZMQ_DEALER) {} enum { kMaxThread = 5 }; void run() { frontend_.bind("tcp://*:5570"); backend_.bind("inproc://backend"); std::vector<server_worker *> worker; std::vector<std::thread *> worker_thread; for (int i = 0; i < kMaxThread; ++i) { worker.push_back(new server_worker(ctx_, ZMQ_DEALER)); worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker))); worker_thread->detach(); } try { zmq::proxy(static_cast<void*>(frontend_), static_cast<void*>(backend_), nullptr); } catch (std::exception &e) {} for (int i = 0; i < kMaxThread; ++i) { delete worker; delete worker_thread; } } private: zmq::context_t ctx_; zmq::socket_t frontend_; zmq::socket_t backend_; }; // The main thread simply starts several clients and a server, and then // waits for the server to finish. int main (void) { client_task ct1; client_task ct2; client_task ct3; server_task st; std::thread t1(std::bind(&client_task::start, &ct1)); std::thread t2(std::bind(&client_task::start, &ct2)); std::thread t3(std::bind(&client_task::start, &ct3)); std::thread t4(std::bind(&server_task::run, &st)); t1.detach(); t2.detach(); t3.detach(); t4.detach(); getchar(); return 0; }
예제에서 서버는 워커 스레드를 만들고 스레드는 하나의 요청을 동기적으로 처리한다. 내부 대기열을 사용하여 이를 프론트 엔드 소켓에 연결한다. zmq_proxy() 함수를 호출하여 프론트엔드 및 백엔드 소켓을 연결한다.
클라이언트와 서버 사이에서 DEALER to ROUTER 패턴으로 통신하고 있지만 서버 내부의 메인 스레드에서는 DEALER to DEALER 를 사용하고 있다. 워커가 엄격하게 동기화 되어야할 필요가 있는 경우 REP 를 사용할 수 있지만 여러 개의 답장을 비동기로 보내려면 DEALER 소켓이 필요하다. 다음은 이것을 수평적으로 나타낸 것이다.
클라이언트는 단일 프레임으로 구성된 메시지를 보낸다. 서버 스레드는 2 프레임 메시지 (클라이언트 ID가 접두어로 붙은 원본 메시지)를 받는다. 그런 다음 워커는 첫 번째 프레임을 ID로 사용하여 두 번째 프레임에 데이터를 넣고 클라이언트에 다시 보낼 수 있다. ROUTER는 메세지를 해당 클라이언트로 라우팅 한다.