Last active
January 17, 2024 01:12
-
-
Save jstefanelli/2e3afa7ad3c5bc376bd6c16f34d2b095 to your computer and use it in GitHub Desktop.
Nth try of some lock free coding - bounded lock-free queue with rudimentary test attached
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef CONCURRENCY_EXPERIMENT_QUEUE_H | |
| #define CONCURRENCY_EXPERIMENT_QUEUE_H | |
| #include <vector> | |
| #include <atomic> | |
| #include <optional> | |
| struct queue_idx { | |
| union { | |
| uint64_t compound; | |
| struct { | |
| //next index to read | |
| uint32_t read; | |
| //next index to write | |
| uint32_t write; | |
| }; | |
| }; | |
| queue_idx() { | |
| compound = 0; | |
| } | |
| }; | |
| static_assert(sizeof(queue_idx) == sizeof(uint64_t)); | |
| static_assert(std::atomic_uint64_t::is_always_lock_free, "No 64-bit CAS"); | |
| union entry_status { | |
| uint32_t compound; | |
| struct { | |
| uint16_t filled; // 0 = empty, 1 = valid data | |
| uint16_t busy; // 0 = free to use, 1 = busy | |
| }; | |
| }; | |
| static_assert(sizeof(entry_status) == sizeof(uint32_t)); | |
| template<typename T> | |
| class queue_entry { | |
| private: | |
| std::atomic_uint32_t hazard; | |
| T data; | |
| public: | |
| queue_entry() { | |
| hazard.store(0); | |
| } | |
| queue_entry(queue_entry&& other) { | |
| data = std::move(other.data); | |
| hazard.store(other.hazard.load()); //probs wrong, but all entries should be allocated/moved once, and then only their members are manipulated | |
| } | |
| //Make sure to read only when there's valid data in the entry | |
| T read() { | |
| entry_status expected {}; | |
| entry_status target {}; | |
| target.busy = 1; //while working set as a busy, filled entry | |
| target.filled = 1; | |
| do { | |
| expected.busy = 0; // expect a filled, but free-to-use entry | |
| expected.filled = 1; | |
| } while(!hazard.compare_exchange_weak(expected.compound, target.compound)); // <-- this won't return true if someone else is doing the same operation, right? | |
| T val = std::move(data); | |
| //is a full CAS loop necessary here? IDK... | |
| target.filled = 0; //Save this as an empty, free-to-use entry | |
| target.busy = 0; | |
| do { | |
| expected.filled = 1; | |
| expected.busy = 1; | |
| } while(!hazard.compare_exchange_weak(expected.compound, target.compound)); | |
| return val; | |
| } | |
| //Make sure to write only when the entry is free and nobody else is touching it | |
| void write(T&& val) { | |
| entry_status expected {}; | |
| entry_status target {}; | |
| target.busy = 1; // while working set as a busy, empty entry | |
| target.filled = 0; | |
| do { | |
| expected.busy = 0; | |
| expected.filled = 0; | |
| } while(!hazard.compare_exchange_weak(expected.compound, target.compound)); // <-- this won't return true if someone else is doing the same operation, right? | |
| data = std::move(val); | |
| //is a full CAS loop necessary here? IDK... | |
| target.busy = 0; // save this as a filled, free-to-use entry | |
| target.filled = 1; | |
| do { | |
| expected.busy = 1; | |
| expected.filled = 0; | |
| } while(!hazard.compare_exchange_weak(expected.compound, target.compound)); | |
| } | |
| }; | |
| static_assert(std::atomic_uint32_t::is_always_lock_free, "No 32-bit CAS"); | |
| template<typename T> | |
| class bounded_queue { | |
| private: | |
| std::vector<queue_entry<T>> _data; | |
| std::atomic_uint64_t _idx; | |
| private: | |
| bool get_insert_idx(queue_idx& ptr) { | |
| ptr.compound = _idx.load(); | |
| queue_idx nx; | |
| do { | |
| nx = ptr; | |
| nx.write = next(ptr.write); | |
| if (nx.write == nx.read) { //queue is full | |
| return false; | |
| } | |
| } while(!_idx.compare_exchange_weak(ptr.compound, nx.compound)); | |
| return true; | |
| } | |
| bool get_read_idx(queue_idx& ptr) { | |
| ptr.compound = _idx.load(); | |
| queue_idx nx; | |
| do { | |
| nx = ptr; | |
| if (nx.read == nx.write) { //queue is empty | |
| return false; | |
| } | |
| nx.read = next(ptr.read); | |
| } while(!_idx.compare_exchange_weak(ptr.compound, nx.compound)); | |
| return true; | |
| } | |
| inline uint32_t next(uint32_t current) const { | |
| return ++current % size(); | |
| } | |
| public: | |
| explicit bounded_queue(uint32_t capacity) { | |
| _data.resize(capacity); | |
| } | |
| bounded_queue() : bounded_queue(32) { | |
| } | |
| std::enable_if<std::is_copy_assignable_v<T>, bool>::type insert(const T& item) { | |
| queue_idx ptr; | |
| if (get_insert_idx(ptr)) { | |
| auto copy = item; | |
| _data[ptr.write].write(std::move(copy)); | |
| return true; | |
| } | |
| return false; | |
| } | |
| bool insert(T&& item) { | |
| queue_idx ptr; | |
| if (get_insert_idx(ptr)) { | |
| _data[ptr.write].write(std::move(item)); | |
| return true; | |
| } | |
| return false; | |
| } | |
| std::optional<T> read() { | |
| queue_idx ptr; | |
| if (get_read_idx(ptr)) { | |
| std::optional op = _data[ptr.read].read(); | |
| return op; | |
| } | |
| return std::nullopt; | |
| } | |
| inline uint32_t size() const { | |
| return static_cast<uint32_t>(_data.size()); | |
| } | |
| }; | |
| #endif //CONCURRENCY_EXPERIMENT_QUEUE_H |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "queue.h" | |
| #include <thread> | |
| #include <mutex> | |
| #include <iostream> | |
| #include <algorithm> | |
| #include <semaphore> | |
| #include <condition_variable> | |
| int main() { | |
| constexpr uint64_t numbers_per_thread = 1024; | |
| constexpr uint64_t num_threads = 64; | |
| bounded_queue<uint64_t> queue(numbers_per_thread * num_threads); | |
| std::atomic_uint64_t read_items_count = 0; | |
| std::vector<uint64_t> read_items; | |
| read_items.reserve(num_threads * numbers_per_thread); | |
| std::mutex read_items_mutex; | |
| std::mutex out_mutex; | |
| std::vector<std::thread> threads; | |
| threads.reserve(64); | |
| std::mutex start_mutex; | |
| std::atomic_uint32_t ready_count = 0; | |
| std::condition_variable start_variable; | |
| for(uint64_t i = 0; i < num_threads; i++) { | |
| if (i % 2 == 0) { | |
| threads.emplace_back([i, &out_mutex, &queue, &start_mutex, &ready_count, &start_variable]() -> void { | |
| auto started = ready_count.fetch_add(1); | |
| if (started == num_threads - 1) { | |
| start_variable.notify_all(); | |
| } else { | |
| std::unique_lock lock(start_mutex); | |
| start_variable.wait(lock); | |
| } | |
| uint64_t start = (i / 2) * numbers_per_thread; | |
| for(uint64_t x = 0; x < numbers_per_thread; x++) { | |
| if(!queue.insert(start + x)) { | |
| std::lock_guard guard(out_mutex); | |
| std::cout << "Failed to insert number " << start + x << " from thread " << i << "." << std::endl; | |
| } | |
| } | |
| }); | |
| } else { | |
| threads.emplace_back([i, &out_mutex, &read_items_mutex, &read_items, &read_items_count, &queue, &ready_count, &start_variable, &start_mutex]() -> void { | |
| std::vector<uint64_t> items; | |
| items.reserve(numbers_per_thread * num_threads); | |
| int tries = 0; | |
| int current_read = 0; | |
| int highest_try = 0; | |
| auto started = ready_count.fetch_add(1); | |
| if (started == num_threads - 1) { | |
| start_variable.notify_all(); | |
| } else { | |
| std::unique_lock lock(start_mutex); | |
| start_variable.wait(lock); | |
| } | |
| do { | |
| auto item = queue.read(); | |
| if (!item.has_value()) { | |
| auto read = read_items_count.fetch_add(current_read); | |
| read += current_read; | |
| tries++; | |
| if (tries > highest_try) { | |
| highest_try = tries; | |
| } | |
| current_read = 0; | |
| if (read == (num_threads / 2) * numbers_per_thread) { | |
| break; | |
| } | |
| } else { | |
| items.push_back(*item); | |
| tries = 0; | |
| current_read++; | |
| } | |
| } while(true); | |
| { | |
| std::lock_guard guard(read_items_mutex); | |
| read_items.insert(read_items.end(), items.begin(), items.end()); | |
| } | |
| { | |
| std::lock_guard guard(out_mutex); | |
| std::cout << "Read Thread " << i << " done with " << highest_try << " highest stall tries." << std::endl; | |
| } | |
| }); | |
| } | |
| } | |
| for(auto& t : threads) { | |
| t.join(); | |
| } | |
| for(uint64_t i = 0; i < (num_threads / 2) * numbers_per_thread; i++) { | |
| if (std::find(read_items.begin(), read_items.end(), i) == read_items.end()) { | |
| std::cout << "Missing entry: " << i << std::endl; | |
| } | |
| } | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment