Created
May 19, 2023 18:20
-
-
Save derbroti/54e316c79b3cd7d6d02b6dc01874ca39 to your computer and use it in GitHub Desktop.
C++ demo for lock-free parallel task execution with dynamic "checkpoint" synchronization (catch-up and roll-back).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // MIT License. Copyright 2023 Mirko Palmer (derbroti) | |
| // Build: | |
| // g++ atomic_runner.cpp --std c++20 -o atomic_runner | |
| // | |
| // On: | |
| // Apple clang version 14.0.3 (clang-1403.0.22.14.1) | |
| // Target: arm64-apple-darwin22.4.0 | |
| // Thread model: posix | |
| #include <iostream> | |
| #include <iomanip> | |
| #include <atomic> | |
| #include <thread> | |
| #include <chrono> | |
| #include <ctime> | |
| #include <algorithm> | |
| const int worker_cnt = 10; | |
| std::atomic_flag sync = ATOMIC_FLAG_INIT; | |
| std::atomic_flag stopped = ATOMIC_FLAG_INIT; | |
| std::atomic_flag cont = ATOMIC_FLAG_INIT; | |
| std::atomic<int> tgt_task = -1; | |
| std::atomic<int> ready_for_sync = 0; | |
| long earliest_sync_request[worker_cnt]; //initialized to -1 | |
| long task[worker_cnt]; //initialized to 1 | |
| long task_before_sync[worker_cnt] = {0}; | |
| void step_back(int id, long tgt_task) { | |
| // DEMO - replace with actual task roll back code here | |
| task[id] = tgt_task; | |
| } | |
| void step(int id) { | |
| // DEMO - replace with actual task execution code here | |
| task[id] += 1; | |
| } | |
| bool need_sync(int id, bool cheat) { // cheat argument is only here to force earlier sync requesst | |
| // DEMO - replace with sync condition | |
| if (cheat) { | |
| return (task[id] +5) % 25 == 0; | |
| } | |
| return (task[id] % 25 == 0) && (id % 2 == 0); | |
| } | |
| void do_sync() { | |
| // DEMO - print | |
| std::cout << std::endl << "do the sync" << std::endl; | |
| // NOTE: | |
| // terminate run here with: | |
| // stopped.test_and_set(std::memory_order_acquire); | |
| } | |
| void run(int id) { | |
| while(! stopped.test(std::memory_order_relaxed)) { | |
| step(id); | |
| /////////////////// | |
| // DEMO - remove | |
| int rando = std::rand()/((RAND_MAX + 1u)/75); | |
| std::this_thread::sleep_for(std::chrono::milliseconds(5 + rando)); | |
| // DEMO - till here | |
| /////////////////// | |
| if (need_sync(id, 0)) { | |
| // enter only if the flag was _not_ set by someone else before | |
| if (! sync.test_and_set(std::memory_order_acquire)) { | |
| // clear continue here, so all threads have a chance to catch the trigger | |
| // should prevent ABA thread sync problem | |
| cont.clear(); | |
| tgt_task.store(task[id]); | |
| tgt_task.notify_all(); | |
| //DEMO - for debug prints | |
| task_before_sync[id] = task[id]; // we issue the sync, so our task count is (for now) | |
| // correct (see earliest_sync code below) | |
| // In case any worker requires an earlier sync than us, we have to roll-back | |
| // everyone to the earliest of the syncs (See code below) | |
| // note: | |
| // we have to explicitly set ourselves as ready, so even if we are the last one to | |
| // do so, the wait condition will trigger - otherwise we would be stuck there | |
| int rdy = ready_for_sync.fetch_add(1); | |
| do { | |
| ready_for_sync.wait(rdy); | |
| rdy = ready_for_sync.load(); | |
| } while (rdy < worker_cnt); | |
| long earliest_sync = -1; | |
| std::cout << "want to sync, all workers: work till task: " << task[id] << std::endl << std::endl; | |
| std::cout << "id task_before task_now sync_req" << std::endl; | |
| for (int i = 0; i < worker_cnt; ++i) { | |
| long esr = earliest_sync_request[i]; | |
| // DEMO - debug print | |
| std::cout << std::setw(2) << i | |
| << std::setw(12) << task_before_sync[i] | |
| << std::setw(9) << task[i] | |
| << std::setw(9) << esr | |
| << std::endl; | |
| // | |
| if (esr != -1) { | |
| if (earliest_sync == -1 || esr < earliest_sync) | |
| earliest_sync = esr; | |
| } | |
| earliest_sync_request[i] = -1; | |
| } | |
| ////////////////// | |
| // DEMO - remove | |
| for (int id = 0; id < worker_cnt; ++id) { | |
| task_before_sync[id] = task[id]; | |
| } | |
| // DEMO - till here | |
| /////////////////// | |
| // unless we encounter an earier sync request from another worker | |
| // (i.e. a slower one encounters an earlier sync while catching up) | |
| // we do not have to do anything here, as everyone is now done the same | |
| // n-th task already | |
| // (See worker sync code from down below) | |
| if (earliest_sync != -1) { | |
| std::thread t[worker_cnt]; | |
| for (int i = 0; i < worker_cnt; ++i) | |
| t[i] = std::thread(step_back, i, earliest_sync); | |
| for (auto &i: t) | |
| i.join(); | |
| } | |
| ////////////////// | |
| // DEMO - remove | |
| std::cout << std::endl << "Move workers to earliest sync task: " << earliest_sync << std::endl << std::endl; | |
| std::cout << "id task_before task_now" << std::endl; | |
| for (int i = 0; i < worker_cnt; ++i) { | |
| std::cout << std::setw(2) << i | |
| << std::setw(12) << task_before_sync[i] | |
| << std::setw(9) << task[i] | |
| << std::endl; | |
| } | |
| // DEMO - till here | |
| /////////////////// | |
| // at this point all worker are synced and waiting at the same task | |
| do_sync(); | |
| // reset everything for next sync trigger | |
| ready_for_sync.store(0); | |
| sync.clear(); | |
| tgt_task.store(-1); | |
| earliest_sync = -1; | |
| // notify all workers to continue to run | |
| cont.test_and_set(std::memory_order_acquire); | |
| cont.notify_all(); | |
| //DEMO - debug print | |
| std::cout << "\n----------\n" << std::endl; | |
| } | |
| } | |
| // Another worker requests a sync | |
| if (sync.test(std::memory_order_relaxed)) { | |
| // make sure we wait until the target task number is set (i.e. tgt_task != -1) | |
| tgt_task.wait(-1); | |
| ///////////////// | |
| // DEMO - remove | |
| int rando = std::rand()/((RAND_MAX + 1u)/100); | |
| std::this_thread::sleep_for(std::chrono::milliseconds(rando)); | |
| // DEMO - till here | |
| /////////////////// | |
| // Dependend on how many more or less tasks we executed, | |
| // execute either additional tasks or rollback to an earlier task | |
| // NOTE: | |
| // When executing more tasks, we have to set earliest_sync_request[id] if we | |
| // encounter a sync condition | |
| // When rolling back, we do not have to care as there was no sync condition | |
| // when executing the tasks the first time | |
| auto tgt = tgt_task.load(); | |
| if (task[id] > tgt) { | |
| step_back(id, tgt); | |
| } else { | |
| while (task[id] < tgt) { | |
| step(id); | |
| if (need_sync(id, 1)) { | |
| earliest_sync_request[id] = task[id]; | |
| break; | |
| } | |
| } | |
| } | |
| // At this point we have moved to the, until now, correct target task and we wait. | |
| // Once every worker did this, the sync requesting worker will be evaluated | |
| // and every worker is moved to the earliest task that required a sync | |
| ready_for_sync.fetch_add(1); | |
| ready_for_sync.notify_all(); | |
| // At this point the sync requesting worker is done executing the earliest sync | |
| // and every worker can continue to work | |
| // NOTE: | |
| // To allow dynamic task sync conditions, we only act on the earliest sync, as | |
| // that might change future sync requests | |
| cont.wait(false); | |
| } | |
| } | |
| } | |
| int main() { | |
| std::srand(std::time(nullptr)); | |
| std::fill_n(earliest_sync_request, worker_cnt, -1); | |
| std::fill_n(task, worker_cnt, 1); | |
| std::thread t[worker_cnt]; | |
| int id = 0; | |
| for (auto& i: t) { | |
| i = std::thread(run, id++); | |
| // DEMO - try to start tasks out of sync | |
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); | |
| } | |
| for (auto &i: t) | |
| i.join(); | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment