Skip to content

Instantly share code, notes, and snippets.

@jstefanelli
Last active January 17, 2024 01:12
Show Gist options
  • Select an option

  • Save jstefanelli/2e3afa7ad3c5bc376bd6c16f34d2b095 to your computer and use it in GitHub Desktop.

Select an option

Save jstefanelli/2e3afa7ad3c5bc376bd6c16f34d2b095 to your computer and use it in GitHub Desktop.
Nth try of some lock free coding - bounded lock-free queue with rudimentary test attached
#ifndef CONCURRENCY_EXPERIMENT_QUEUE_H
#define CONCURRENCY_EXPERIMENT_QUEUE_H
#include <vector>
#include <atomic>
#include <optional>
struct queue_idx {
union {
uint64_t compound;
struct {
//next index to read
uint32_t read;
//next index to write
uint32_t write;
};
};
queue_idx() {
compound = 0;
}
};
static_assert(sizeof(queue_idx) == sizeof(uint64_t));
static_assert(std::atomic_uint64_t::is_always_lock_free, "No 64-bit CAS");
union entry_status {
uint32_t compound;
struct {
uint16_t filled; // 0 = empty, 1 = valid data
uint16_t busy; // 0 = free to use, 1 = busy
};
};
static_assert(sizeof(entry_status) == sizeof(uint32_t));
template<typename T>
class queue_entry {
private:
std::atomic_uint32_t hazard;
T data;
public:
queue_entry() {
hazard.store(0);
}
queue_entry(queue_entry&& other) {
data = std::move(other.data);
hazard.store(other.hazard.load()); //probs wrong, but all entries should be allocated/moved once, and then only their members are manipulated
}
//Make sure to read only when there's valid data in the entry
T read() {
entry_status expected {};
entry_status target {};
target.busy = 1; //while working set as a busy, filled entry
target.filled = 1;
do {
expected.busy = 0; // expect a filled, but free-to-use entry
expected.filled = 1;
} while(!hazard.compare_exchange_weak(expected.compound, target.compound)); // <-- this won't return true if someone else is doing the same operation, right?
T val = std::move(data);
//is a full CAS loop necessary here? IDK...
target.filled = 0; //Save this as an empty, free-to-use entry
target.busy = 0;
do {
expected.filled = 1;
expected.busy = 1;
} while(!hazard.compare_exchange_weak(expected.compound, target.compound));
return val;
}
//Make sure to write only when the entry is free and nobody else is touching it
void write(T&& val) {
entry_status expected {};
entry_status target {};
target.busy = 1; // while working set as a busy, empty entry
target.filled = 0;
do {
expected.busy = 0;
expected.filled = 0;
} while(!hazard.compare_exchange_weak(expected.compound, target.compound)); // <-- this won't return true if someone else is doing the same operation, right?
data = std::move(val);
//is a full CAS loop necessary here? IDK...
target.busy = 0; // save this as a filled, free-to-use entry
target.filled = 1;
do {
expected.busy = 1;
expected.filled = 0;
} while(!hazard.compare_exchange_weak(expected.compound, target.compound));
}
};
static_assert(std::atomic_uint32_t::is_always_lock_free, "No 32-bit CAS");
template<typename T>
class bounded_queue {
private:
std::vector<queue_entry<T>> _data;
std::atomic_uint64_t _idx;
private:
bool get_insert_idx(queue_idx& ptr) {
ptr.compound = _idx.load();
queue_idx nx;
do {
nx = ptr;
nx.write = next(ptr.write);
if (nx.write == nx.read) { //queue is full
return false;
}
} while(!_idx.compare_exchange_weak(ptr.compound, nx.compound));
return true;
}
bool get_read_idx(queue_idx& ptr) {
ptr.compound = _idx.load();
queue_idx nx;
do {
nx = ptr;
if (nx.read == nx.write) { //queue is empty
return false;
}
nx.read = next(ptr.read);
} while(!_idx.compare_exchange_weak(ptr.compound, nx.compound));
return true;
}
inline uint32_t next(uint32_t current) const {
return ++current % size();
}
public:
explicit bounded_queue(uint32_t capacity) {
_data.resize(capacity);
}
bounded_queue() : bounded_queue(32) {
}
std::enable_if<std::is_copy_assignable_v<T>, bool>::type insert(const T& item) {
queue_idx ptr;
if (get_insert_idx(ptr)) {
auto copy = item;
_data[ptr.write].write(std::move(copy));
return true;
}
return false;
}
bool insert(T&& item) {
queue_idx ptr;
if (get_insert_idx(ptr)) {
_data[ptr.write].write(std::move(item));
return true;
}
return false;
}
std::optional<T> read() {
queue_idx ptr;
if (get_read_idx(ptr)) {
std::optional op = _data[ptr.read].read();
return op;
}
return std::nullopt;
}
inline uint32_t size() const {
return static_cast<uint32_t>(_data.size());
}
};
#endif //CONCURRENCY_EXPERIMENT_QUEUE_H
#include "queue.h"
#include <thread>
#include <mutex>
#include <iostream>
#include <algorithm>
#include <semaphore>
#include <condition_variable>
int main() {
constexpr uint64_t numbers_per_thread = 1024;
constexpr uint64_t num_threads = 64;
bounded_queue<uint64_t> queue(numbers_per_thread * num_threads);
std::atomic_uint64_t read_items_count = 0;
std::vector<uint64_t> read_items;
read_items.reserve(num_threads * numbers_per_thread);
std::mutex read_items_mutex;
std::mutex out_mutex;
std::vector<std::thread> threads;
threads.reserve(64);
std::mutex start_mutex;
std::atomic_uint32_t ready_count = 0;
std::condition_variable start_variable;
for(uint64_t i = 0; i < num_threads; i++) {
if (i % 2 == 0) {
threads.emplace_back([i, &out_mutex, &queue, &start_mutex, &ready_count, &start_variable]() -> void {
auto started = ready_count.fetch_add(1);
if (started == num_threads - 1) {
start_variable.notify_all();
} else {
std::unique_lock lock(start_mutex);
start_variable.wait(lock);
}
uint64_t start = (i / 2) * numbers_per_thread;
for(uint64_t x = 0; x < numbers_per_thread; x++) {
if(!queue.insert(start + x)) {
std::lock_guard guard(out_mutex);
std::cout << "Failed to insert number " << start + x << " from thread " << i << "." << std::endl;
}
}
});
} else {
threads.emplace_back([i, &out_mutex, &read_items_mutex, &read_items, &read_items_count, &queue, &ready_count, &start_variable, &start_mutex]() -> void {
std::vector<uint64_t> items;
items.reserve(numbers_per_thread * num_threads);
int tries = 0;
int current_read = 0;
int highest_try = 0;
auto started = ready_count.fetch_add(1);
if (started == num_threads - 1) {
start_variable.notify_all();
} else {
std::unique_lock lock(start_mutex);
start_variable.wait(lock);
}
do {
auto item = queue.read();
if (!item.has_value()) {
auto read = read_items_count.fetch_add(current_read);
read += current_read;
tries++;
if (tries > highest_try) {
highest_try = tries;
}
current_read = 0;
if (read == (num_threads / 2) * numbers_per_thread) {
break;
}
} else {
items.push_back(*item);
tries = 0;
current_read++;
}
} while(true);
{
std::lock_guard guard(read_items_mutex);
read_items.insert(read_items.end(), items.begin(), items.end());
}
{
std::lock_guard guard(out_mutex);
std::cout << "Read Thread " << i << " done with " << highest_try << " highest stall tries." << std::endl;
}
});
}
}
for(auto& t : threads) {
t.join();
}
for(uint64_t i = 0; i < (num_threads / 2) * numbers_per_thread; i++) {
if (std::find(read_items.begin(), read_items.end(), i) == read_items.end()) {
std::cout << "Missing entry: " << i << std::endl;
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment