Skip to content

Instantly share code, notes, and snippets.

@msr1k
Created July 4, 2019 10:14
Show Gist options
  • Select an option

  • Save msr1k/ae727ad553c0b77a16668135f223a1b4 to your computer and use it in GitHub Desktop.

Select an option

Save msr1k/ae727ad553c0b77a16668135f223a1b4 to your computer and use it in GitHub Desktop.
C++ async_queue.hpp
#ifndef ASYNC_QUEUE_HPP
#define ASYNC_QUEUE_HPP
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
// async_queue is an asynchronous queue implementation.
//
// If you call pull when it is empty, pull method waits untill something is pushed to it.
//
template <class T>
class async_queue {
public:
async_queue() : m(), cv(), q() {}
void push(T& t) {
{
std::unique_lock<std::mutex> l(m);
q.push(t);
}
cv.notify_one();
}
T pull() {
std::unique_lock<std::mutex> l(m);
if (!q.empty()) {
return pop();
}
auto that = this;
cv.wait(l, [that]{ return !that->q.empty(); });
return pop();
}
private:
T pop() {
T t = q.front();
q.pop();
return t;
}
private:
std::mutex m;
std::condition_variable cv;
std::queue<T> q;
};
// async_queue_source is a source of a async_queue.
//
// What all you can do is just set to the queue.
//
template <class T>
class async_queue_source {
private:
explicit async_queue_source(std::shared_ptr<async_queue<T>>& q) : q(q) {}
public:
void set(T& t) {
q->push(t);
}
public:
static std::unique_ptr<async_queue_source<T>> create(std::shared_ptr<async_queue<T>>& q) {
return std::unique_ptr<async_queue_source<T>>(new async_queue_source<T>(q));
}
private:
std::shared_ptr<async_queue<T>> q;
};
// async_queue_sink is a sink of a async_queue.
//
// What all you can do is just get from the queue.
//
template <class T>
class async_queue_sink {
private:
explicit async_queue_sink(std::shared_ptr<async_queue<T>>& q) : q(q) {}
public:
T get() {
return q->pull();
}
public:
static std::unique_ptr<async_queue_sink<T>> create(std::shared_ptr<async_queue<T>>& q) {
return std::unique_ptr<async_queue_sink<T>>(new async_queue_sink<T>(q));
}
private:
std::shared_ptr<async_queue<T>> q;
};
#endif // ASYNC_QUEUE_HPP
#include "async_queue.hpp"
#include <memory>
#include <mutex>
#include <thread>
#include <iostream>
void print(int i, int j) {
static std::mutex m;
std::lock_guard<std::mutex> l(m);
std::cout << "t" << i << ": " << j << std::endl;
}
int main(int argc, char const* argv[])
{
auto q = std::make_shared<async_queue<int>>();
auto so = async_queue_source<int>::create(q);
std::shared_ptr<async_queue_sink<int>> si = async_queue_sink<int>::create(q);
std::thread t1([si]() {
while(true) {
int i = si->get();
print(1, i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
std::thread t2([si]() {
while(true) {
int i = si->get();
print(2, i);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
t1.detach();
t2.detach();
for (int i = 0; i < 100; i++) {
so->set(i);
}
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment