Skip to content

Instantly share code, notes, and snippets.

@luca-drf
Last active March 25, 2021 11:52
Show Gist options
  • Select an option

  • Save luca-drf/ed9d0f1618c51f0eece5 to your computer and use it in GitHub Desktop.

Select an option

Save luca-drf/ed9d0f1618c51f0eece5 to your computer and use it in GitHub Desktop.
Simple load balancer using ZeroMQ
#include "zhelpers.hpp"
#include <queue>
int main(int argc, char *argv[])
{
// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend(context, ZMQ_ROUTER);
zmq::socket_t backend(context, ZMQ_ROUTER);
zmq::socket_t ping_rep(context, ZMQ_REP);
frontend.bind("tcp://*:5672"); // frontend
backend.bind("tcp://*:5673"); // backend
ping_rep.bind("tcp://*:5671");
// Logic of LRU loop
// - Poll backend always, frontend only if 1+ worker ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary
// - If client requests, pop next worker and send request to it
//
// A very simple queue structure with known max size
std::queue<std::string> worker_queue;
while (1) {
// Initialize poll set
zmq::pollitem_t items[] = {
{ ping_rep, 0, ZMQ_POLLIN, 0 },
// Always poll for worker activity on backend
{ backend, 0, ZMQ_POLLIN, 0 },
// Poll front-end only if we have available workers
{ frontend, 0, ZMQ_POLLIN, 0 }
};
if (!worker_queue.empty()) {
zmq::poll(&items[0], 3, -1);
//std::cout << "Poll front and back" << std::endl;
}
else {
zmq::poll(&items[0], 2, -1);
//std::cout << "Poll back" << std::endl;
}
// Reply to heartbeat ping
if (items[0].revents & ZMQ_POLLIN) {
//std::cout << "Receive ping" << std::endl;
{
std::string empty = s_recv(ping_rep);
assert(empty.size() == 0);
}
if (worker_queue.empty()) {
s_send(ping_rep, "BUSY");
//std::cout << "Reply: BUSY" << std::endl;
}
else {
s_send(ping_rep, "OK");
//std::cout << "Reply: OK" << std::endl;
}
}
// Handle worker activity on backend
// Poll backend
if (items[1].revents & ZMQ_POLLIN) {
//std::cout << "Receive from backend" << std::endl;
// FRAME 1: Added by broker ROUTER socket on entering
// Queue worker address for LRU routing
std::string worker = s_recv(backend);
worker_queue.push(worker);
//std::cout << "Queue worker" << worker << std::endl;
{
// FRAME 2: Added by worker REQ socket on exiting
// Second frame is empty
std::string empty = s_recv(backend);
assert(empty.size() == 0);
}
// FRAME 3: Added by worker
// Third frame is READY or else a client reply address
std::string client_addr = s_recv(backend);
// If client reply, send rest back to frontend
if (client_addr.compare("READY") != 0) {
{
// FRAME 4: Added by worker
// Fourth frame is empty
std::string empty = s_recv(backend);
assert(empty.size() == 0);
}
// FRAME 5
// Fifth frame is the reply
std::string reply = s_recv(backend);
s_sendmore(frontend, client_addr);
s_sendmore(frontend, "");
s_send(frontend, reply);
std::cout << "Worker: " << worker
<< " reply: " << reply
<< " to: " << client_addr << std::endl;
}
}
// Poll frontend
if (items[2].revents & ZMQ_POLLIN) {
//std::cout << "Receive from frontend" << std::endl;
// FRAME 1: Added by ROUTER socket on entering
// Now get next client request, route to LRU worker
// Client request is [address][empty][request]
std::string client_addr = s_recv(frontend);
{
// FRAME 2: Added by client REQ socket on exit
// Frame 2 is empty
std::string empty = s_recv(frontend);
assert(empty.size() == 0);
}
// FRAME 3: Added by client application
// Request
std::string request = s_recv(frontend);
std::string worker_addr = worker_queue.front();//worker_queue [0];
worker_queue.pop();
// FRAME 1: Chopped by broker ROUTER socket on exit
s_sendmore(backend, worker_addr);
// FRAME 2: Chopped by worker REQ socket on entering
s_sendmore(backend, "");
// FRAME 3: Client addr, processed in worker application
s_sendmore(backend, client_addr);
// FRAME 4: empty
s_sendmore(backend, "");
// FRAME 5: Client request
s_send(backend, request);
std::cout << "Client: " << client_addr
<< " request: " << request
<< " to: " << worker_addr << std::endl;
}
}
return 0;
}
// Basic client using DEALER socket
//
#include "zhelpers.hpp"
#define REQUEST_TIMEOUT 2500 // (msec.)
#define RETRIES 3
std::string ping_host(std::string address, zmq::context_t & context, int timeout, int retries = 0) {
int retry = 0;
int linger = 0;
std::cout << "Ping host" << std::endl;
do {
timeout = timeout * (retry + 1);
zmq::socket_t host(context, ZMQ_REQ);
host.connect(address);
host.setsockopt(ZMQ_LINGER, &linger, sizeof (int));
host.setsockopt(ZMQ_SNDTIMEO, &timeout, sizeof(int));
host.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int));
zmq::message_t message;
host.send(message);
if(!host.recv(&message)) {
std::cout << "Retry..." << std::endl;
retry++;
}
else {
return std::string(static_cast<char*>(message.data()), message.size());
}
} while (retry <= retries);
return "DOWN";
}
int main(int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t broker(context, ZMQ_DEALER);
// Set a printable identity (Not mandatory)
std::string id = "Client";
broker.setsockopt(ZMQ_IDENTITY, id.c_str(), id.length());
broker.connect("tcp://localhost:5672"); // frontend
std::string host = "tcp://localhost:5671";
int total_jobs = 10;
int jobs_recv = 0;
int jobs_sent = 0;
int status = 0; // 0: Poll IN and OUT, 1: Poll IN only
while (1) {
zmq::pollitem_t items[] = {
// {zmq_socket, posix_socket, events, revents }
{ broker, 0, (ZMQ_POLLIN | ZMQ_POLLOUT), 0 },
{ broker, 0, ZMQ_POLLIN, 0 },
};
if (status == 0 && jobs_sent == total_jobs) {
status = 1;
}
zmq::poll(&items[status], 1, REQUEST_TIMEOUT); // Blocking
if (items[status].revents & ZMQ_POLLIN) {
{
std::string empty = s_recv(broker);
assert(empty.size() == 0);
}
std::string reply = s_recv(broker);
std::cout << "Client received reply: " << reply << std::endl;
jobs_recv++;
if (jobs_recv == total_jobs) {
std::cout << "Done!" << std::endl;
break;
}
}
if (items[status].revents & ZMQ_POLLOUT) {
s_sendmore(broker, "");
s_send(broker, std::to_string(jobs_sent).c_str());
std::cout << "Client sent job: " << jobs_sent << std::endl;
jobs_sent++;
}
if (!(items[status].revents & ZMQ_POLLOUT) &&
!(items[status].revents & ZMQ_POLLIN)) {
std::cout << "Timeout" << std::endl;
std::string host_stat = ping_host(host, context, REQUEST_TIMEOUT, RETRIES);
std::cout << "Host stat: " << host_stat << std::endl;
if (host_stat == "DOWN") {
std::cout << "Host not responding!" << std::endl;
break;
}
if (jobs_sent == total_jobs && host_stat == "OK") {
// Resend unfinished jobs
std::cout << "Resend unfinished jobs" << std::endl;
}
}
}
return 0;
}
// Worker using REQ socket to do LRU routing
//
#include "zhelpers.hpp"
#include <stdlib.h> /* srand, rand */
#include <time.h> /* time */
int main(int argc, char *argv[]) {
zmq::context_t context(1);
zmq::socket_t broker(context, ZMQ_REQ);
// Set a printable identity (Not mandatory)
std::string id = "Worker-";
srand (time(NULL));
id += std::to_string(rand() % 100);
broker.setsockopt(ZMQ_IDENTITY, id.c_str(), id.length());
broker.connect("tcp://localhost:5673"); // backend
// Tell backend we're ready for work
s_send(broker, "READY");
while (1) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
std::string client_address = s_recv(broker);
{
std::string empty = s_recv(broker);
assert(empty.size() == 0);
}
// Get request, send reply
std::string request = s_recv(broker);
std::cout << "Worker got job: " << request << std::endl;
sleep(rand() % 7);
s_sendmore(broker, client_address);
s_sendmore(broker, "");
s_send(broker, request.c_str());
std::cout << "Worker sent job: " << request << std::endl;
}
return 0;
}
@luca-drf
Copy link
Author

Overview

This is the basic network architecture for distributing tasks. It's composed by three parts:

  • Client: Generates tasks, collects solved tasks
  • Broker: Dispatch tasks and replies between the client and the workers
  • Worker: Receive one task at time from the broker, solves it, and sends it back

Client | ZMQ_DEALER <----> ZMQ_ROUTER | Broker | ZMQ_ROUTER <----> ZMQ_REQ | Worker

The communication is completely asynchronous.

Although this example is implemented to run with one client, one broker and several workers, the overall architecture can be easily tweaked to run with an arbitrary number of clients, brokers and workers together even though the best choice in my opinion is to have one broker for each client and many workers.

Installation

Requirements:

  • C++11 Compiler
  • ZeroMQ
  • ZeroMQ helpers
  • ZeroMQ C++ bindings

First copy broker.cpp, client.cpp and worker.cpp into a folder named zmq_broker

Install ZeroMQ

  1. Download ZeroMQ 4.1.2 Posix Tarball
  2. tar xzf zeromq-4.1.2.tar.gz
  3. cd zeromq-4.1.2.tar.gz
  4. ./configure --without-libsodium --prefix=~/usr (might require absolute path)
  5. make
  6. make install

Install ZeroMQ helpers

  1. git clone https://github.com/imatix/zguide
  2. cd <path to zmq_broker>
  3. ln -s <path to zguide>/examples/C++/zhelpers.hpp

Install ZeroMQ C++ bindings

  1. git clone https://github.com/zeromq/cppzmq
  2. cd ~/usr/include
  3. ln -s <path to cppzmq>/zmq.hpp

Compile

  • broker clang++ -I ~/usr/include -L ~/usr/lib broker.cpp -o broker -lzmq
  • client clang++ -I ~/usr/include -L ~/usr/lib client.cpp -o client -lzmq
  • worker clang++ -I ~/usr/include -L ~/usr/lib worker.cpp -o worker -lzmq

Run

Run separate processes for each component following this order:

  1. run ./broker
  2. run ./worker (Run at least two workers in two different sessions)
  3. run ./client
  4. Enjoy :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment