Last active
March 25, 2021 11:52
-
-
Save luca-drf/ed9d0f1618c51f0eece5 to your computer and use it in GitHub Desktop.
Simple load balancer using ZeroMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "zhelpers.hpp" | |
| #include <queue> | |
| int main(int argc, char *argv[]) | |
| { | |
| // Prepare our context and sockets | |
| zmq::context_t context(1); | |
| zmq::socket_t frontend(context, ZMQ_ROUTER); | |
| zmq::socket_t backend(context, ZMQ_ROUTER); | |
| zmq::socket_t ping_rep(context, ZMQ_REP); | |
| frontend.bind("tcp://*:5672"); // frontend | |
| backend.bind("tcp://*:5673"); // backend | |
| ping_rep.bind("tcp://*:5671"); | |
| // Logic of LRU loop | |
| // - Poll backend always, frontend only if 1+ worker ready | |
| // - If worker replies, queue worker as ready and forward reply | |
| // to client if necessary | |
| // - If client requests, pop next worker and send request to it | |
| // | |
| // A very simple queue structure with known max size | |
| std::queue<std::string> worker_queue; | |
| while (1) { | |
| // Initialize poll set | |
| zmq::pollitem_t items[] = { | |
| { ping_rep, 0, ZMQ_POLLIN, 0 }, | |
| // Always poll for worker activity on backend | |
| { backend, 0, ZMQ_POLLIN, 0 }, | |
| // Poll front-end only if we have available workers | |
| { frontend, 0, ZMQ_POLLIN, 0 } | |
| }; | |
| if (!worker_queue.empty()) { | |
| zmq::poll(&items[0], 3, -1); | |
| //std::cout << "Poll front and back" << std::endl; | |
| } | |
| else { | |
| zmq::poll(&items[0], 2, -1); | |
| //std::cout << "Poll back" << std::endl; | |
| } | |
| // Reply to heartbeat ping | |
| if (items[0].revents & ZMQ_POLLIN) { | |
| //std::cout << "Receive ping" << std::endl; | |
| { | |
| std::string empty = s_recv(ping_rep); | |
| assert(empty.size() == 0); | |
| } | |
| if (worker_queue.empty()) { | |
| s_send(ping_rep, "BUSY"); | |
| //std::cout << "Reply: BUSY" << std::endl; | |
| } | |
| else { | |
| s_send(ping_rep, "OK"); | |
| //std::cout << "Reply: OK" << std::endl; | |
| } | |
| } | |
| // Handle worker activity on backend | |
| // Poll backend | |
| if (items[1].revents & ZMQ_POLLIN) { | |
| //std::cout << "Receive from backend" << std::endl; | |
| // FRAME 1: Added by broker ROUTER socket on entering | |
| // Queue worker address for LRU routing | |
| std::string worker = s_recv(backend); | |
| worker_queue.push(worker); | |
| //std::cout << "Queue worker" << worker << std::endl; | |
| { | |
| // FRAME 2: Added by worker REQ socket on exiting | |
| // Second frame is empty | |
| std::string empty = s_recv(backend); | |
| assert(empty.size() == 0); | |
| } | |
| // FRAME 3: Added by worker | |
| // Third frame is READY or else a client reply address | |
| std::string client_addr = s_recv(backend); | |
| // If client reply, send rest back to frontend | |
| if (client_addr.compare("READY") != 0) { | |
| { | |
| // FRAME 4: Added by worker | |
| // Fourth frame is empty | |
| std::string empty = s_recv(backend); | |
| assert(empty.size() == 0); | |
| } | |
| // FRAME 5 | |
| // Fifth frame is the reply | |
| std::string reply = s_recv(backend); | |
| s_sendmore(frontend, client_addr); | |
| s_sendmore(frontend, ""); | |
| s_send(frontend, reply); | |
| std::cout << "Worker: " << worker | |
| << " reply: " << reply | |
| << " to: " << client_addr << std::endl; | |
| } | |
| } | |
| // Poll frontend | |
| if (items[2].revents & ZMQ_POLLIN) { | |
| //std::cout << "Receive from frontend" << std::endl; | |
| // FRAME 1: Added by ROUTER socket on entering | |
| // Now get next client request, route to LRU worker | |
| // Client request is [address][empty][request] | |
| std::string client_addr = s_recv(frontend); | |
| { | |
| // FRAME 2: Added by client REQ socket on exit | |
| // Frame 2 is empty | |
| std::string empty = s_recv(frontend); | |
| assert(empty.size() == 0); | |
| } | |
| // FRAME 3: Added by client application | |
| // Request | |
| std::string request = s_recv(frontend); | |
| std::string worker_addr = worker_queue.front();//worker_queue [0]; | |
| worker_queue.pop(); | |
| // FRAME 1: Chopped by broker ROUTER socket on exit | |
| s_sendmore(backend, worker_addr); | |
| // FRAME 2: Chopped by worker REQ socket on entering | |
| s_sendmore(backend, ""); | |
| // FRAME 3: Client addr, processed in worker application | |
| s_sendmore(backend, client_addr); | |
| // FRAME 4: empty | |
| s_sendmore(backend, ""); | |
| // FRAME 5: Client request | |
| s_send(backend, request); | |
| std::cout << "Client: " << client_addr | |
| << " request: " << request | |
| << " to: " << worker_addr << std::endl; | |
| } | |
| } | |
| return 0; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Basic client using DEALER socket | |
| // | |
| #include "zhelpers.hpp" | |
| #define REQUEST_TIMEOUT 2500 // (msec.) | |
| #define RETRIES 3 | |
| std::string ping_host(std::string address, zmq::context_t & context, int timeout, int retries = 0) { | |
| int retry = 0; | |
| int linger = 0; | |
| std::cout << "Ping host" << std::endl; | |
| do { | |
| timeout = timeout * (retry + 1); | |
| zmq::socket_t host(context, ZMQ_REQ); | |
| host.connect(address); | |
| host.setsockopt(ZMQ_LINGER, &linger, sizeof (int)); | |
| host.setsockopt(ZMQ_SNDTIMEO, &timeout, sizeof(int)); | |
| host.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int)); | |
| zmq::message_t message; | |
| host.send(message); | |
| if(!host.recv(&message)) { | |
| std::cout << "Retry..." << std::endl; | |
| retry++; | |
| } | |
| else { | |
| return std::string(static_cast<char*>(message.data()), message.size()); | |
| } | |
| } while (retry <= retries); | |
| return "DOWN"; | |
| } | |
| int main(int argc, char *argv[]) | |
| { | |
| zmq::context_t context(1); | |
| zmq::socket_t broker(context, ZMQ_DEALER); | |
| // Set a printable identity (Not mandatory) | |
| std::string id = "Client"; | |
| broker.setsockopt(ZMQ_IDENTITY, id.c_str(), id.length()); | |
| broker.connect("tcp://localhost:5672"); // frontend | |
| std::string host = "tcp://localhost:5671"; | |
| int total_jobs = 10; | |
| int jobs_recv = 0; | |
| int jobs_sent = 0; | |
| int status = 0; // 0: Poll IN and OUT, 1: Poll IN only | |
| while (1) { | |
| zmq::pollitem_t items[] = { | |
| // {zmq_socket, posix_socket, events, revents } | |
| { broker, 0, (ZMQ_POLLIN | ZMQ_POLLOUT), 0 }, | |
| { broker, 0, ZMQ_POLLIN, 0 }, | |
| }; | |
| if (status == 0 && jobs_sent == total_jobs) { | |
| status = 1; | |
| } | |
| zmq::poll(&items[status], 1, REQUEST_TIMEOUT); // Blocking | |
| if (items[status].revents & ZMQ_POLLIN) { | |
| { | |
| std::string empty = s_recv(broker); | |
| assert(empty.size() == 0); | |
| } | |
| std::string reply = s_recv(broker); | |
| std::cout << "Client received reply: " << reply << std::endl; | |
| jobs_recv++; | |
| if (jobs_recv == total_jobs) { | |
| std::cout << "Done!" << std::endl; | |
| break; | |
| } | |
| } | |
| if (items[status].revents & ZMQ_POLLOUT) { | |
| s_sendmore(broker, ""); | |
| s_send(broker, std::to_string(jobs_sent).c_str()); | |
| std::cout << "Client sent job: " << jobs_sent << std::endl; | |
| jobs_sent++; | |
| } | |
| if (!(items[status].revents & ZMQ_POLLOUT) && | |
| !(items[status].revents & ZMQ_POLLIN)) { | |
| std::cout << "Timeout" << std::endl; | |
| std::string host_stat = ping_host(host, context, REQUEST_TIMEOUT, RETRIES); | |
| std::cout << "Host stat: " << host_stat << std::endl; | |
| if (host_stat == "DOWN") { | |
| std::cout << "Host not responding!" << std::endl; | |
| break; | |
| } | |
| if (jobs_sent == total_jobs && host_stat == "OK") { | |
| // Resend unfinished jobs | |
| std::cout << "Resend unfinished jobs" << std::endl; | |
| } | |
| } | |
| } | |
| return 0; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Worker using REQ socket to do LRU routing | |
| // | |
| #include "zhelpers.hpp" | |
| #include <stdlib.h> /* srand, rand */ | |
| #include <time.h> /* time */ | |
| int main(int argc, char *argv[]) { | |
| zmq::context_t context(1); | |
| zmq::socket_t broker(context, ZMQ_REQ); | |
| // Set a printable identity (Not mandatory) | |
| std::string id = "Worker-"; | |
| srand (time(NULL)); | |
| id += std::to_string(rand() % 100); | |
| broker.setsockopt(ZMQ_IDENTITY, id.c_str(), id.length()); | |
| broker.connect("tcp://localhost:5673"); // backend | |
| // Tell backend we're ready for work | |
| s_send(broker, "READY"); | |
| while (1) { | |
| // Read and save all frames until we get an empty frame | |
| // In this example there is only 1 but it could be more | |
| std::string client_address = s_recv(broker); | |
| { | |
| std::string empty = s_recv(broker); | |
| assert(empty.size() == 0); | |
| } | |
| // Get request, send reply | |
| std::string request = s_recv(broker); | |
| std::cout << "Worker got job: " << request << std::endl; | |
| sleep(rand() % 7); | |
| s_sendmore(broker, client_address); | |
| s_sendmore(broker, ""); | |
| s_send(broker, request.c_str()); | |
| std::cout << "Worker sent job: " << request << std::endl; | |
| } | |
| return 0; | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Overview
This is the basic network architecture for distributing tasks. It's composed by three parts:
Client | ZMQ_DEALER <----> ZMQ_ROUTER | Broker | ZMQ_ROUTER <----> ZMQ_REQ | WorkerThe communication is completely asynchronous.
Although this example is implemented to run with one client, one broker and several workers, the overall architecture can be easily tweaked to run with an arbitrary number of clients, brokers and workers together even though the best choice in my opinion is to have one broker for each client and many workers.
Installation
Requirements:
First copy
broker.cpp,client.cppandworker.cppinto a folder namedzmq_brokerInstall ZeroMQ
tar xzf zeromq-4.1.2.tar.gzcd zeromq-4.1.2.tar.gz./configure --without-libsodium --prefix=~/usr(might require absolute path)makemake installInstall ZeroMQ helpers
git clone https://github.com/imatix/zguidecd <path to zmq_broker>ln -s <path to zguide>/examples/C++/zhelpers.hppInstall ZeroMQ C++ bindings
git clone https://github.com/zeromq/cppzmqcd ~/usr/includeln -s <path to cppzmq>/zmq.hppCompile
clang++ -I ~/usr/include -L ~/usr/lib broker.cpp -o broker -lzmqclang++ -I ~/usr/include -L ~/usr/lib client.cpp -o client -lzmqclang++ -I ~/usr/include -L ~/usr/lib worker.cpp -o worker -lzmqRun
Run separate processes for each component following this order:
./broker./worker(Run at least two workers in two different sessions)./client