Skip to content

Instantly share code, notes, and snippets.

@derbroti
Created May 19, 2023 18:20
Show Gist options
  • Select an option

  • Save derbroti/54e316c79b3cd7d6d02b6dc01874ca39 to your computer and use it in GitHub Desktop.

Select an option

Save derbroti/54e316c79b3cd7d6d02b6dc01874ca39 to your computer and use it in GitHub Desktop.
C++ demo for lock-free parallel task execution with dynamic "checkpoint" synchronization (catch-up and roll-back).
// MIT License. Copyright 2023 Mirko Palmer (derbroti)
// Build:
// g++ atomic_runner.cpp --std c++20 -o atomic_runner
//
// On:
// Apple clang version 14.0.3 (clang-1403.0.22.14.1)
// Target: arm64-apple-darwin22.4.0
// Thread model: posix
#include <iostream>
#include <iomanip>
#include <atomic>
#include <thread>
#include <chrono>
#include <ctime>
#include <algorithm>
const int worker_cnt = 10;
std::atomic_flag sync = ATOMIC_FLAG_INIT;
std::atomic_flag stopped = ATOMIC_FLAG_INIT;
std::atomic_flag cont = ATOMIC_FLAG_INIT;
std::atomic<int> tgt_task = -1;
std::atomic<int> ready_for_sync = 0;
long earliest_sync_request[worker_cnt]; //initialized to -1
long task[worker_cnt]; //initialized to 1
long task_before_sync[worker_cnt] = {0};
void step_back(int id, long tgt_task) {
// DEMO - replace with actual task roll back code here
task[id] = tgt_task;
}
void step(int id) {
// DEMO - replace with actual task execution code here
task[id] += 1;
}
bool need_sync(int id, bool cheat) { // cheat argument is only here to force earlier sync requesst
// DEMO - replace with sync condition
if (cheat) {
return (task[id] +5) % 25 == 0;
}
return (task[id] % 25 == 0) && (id % 2 == 0);
}
void do_sync() {
// DEMO - print
std::cout << std::endl << "do the sync" << std::endl;
// NOTE:
// terminate run here with:
// stopped.test_and_set(std::memory_order_acquire);
}
void run(int id) {
while(! stopped.test(std::memory_order_relaxed)) {
step(id);
///////////////////
// DEMO - remove
int rando = std::rand()/((RAND_MAX + 1u)/75);
std::this_thread::sleep_for(std::chrono::milliseconds(5 + rando));
// DEMO - till here
///////////////////
if (need_sync(id, 0)) {
// enter only if the flag was _not_ set by someone else before
if (! sync.test_and_set(std::memory_order_acquire)) {
// clear continue here, so all threads have a chance to catch the trigger
// should prevent ABA thread sync problem
cont.clear();
tgt_task.store(task[id]);
tgt_task.notify_all();
//DEMO - for debug prints
task_before_sync[id] = task[id]; // we issue the sync, so our task count is (for now)
// correct (see earliest_sync code below)
// In case any worker requires an earlier sync than us, we have to roll-back
// everyone to the earliest of the syncs (See code below)
// note:
// we have to explicitly set ourselves as ready, so even if we are the last one to
// do so, the wait condition will trigger - otherwise we would be stuck there
int rdy = ready_for_sync.fetch_add(1);
do {
ready_for_sync.wait(rdy);
rdy = ready_for_sync.load();
} while (rdy < worker_cnt);
long earliest_sync = -1;
std::cout << "want to sync, all workers: work till task: " << task[id] << std::endl << std::endl;
std::cout << "id task_before task_now sync_req" << std::endl;
for (int i = 0; i < worker_cnt; ++i) {
long esr = earliest_sync_request[i];
// DEMO - debug print
std::cout << std::setw(2) << i
<< std::setw(12) << task_before_sync[i]
<< std::setw(9) << task[i]
<< std::setw(9) << esr
<< std::endl;
//
if (esr != -1) {
if (earliest_sync == -1 || esr < earliest_sync)
earliest_sync = esr;
}
earliest_sync_request[i] = -1;
}
//////////////////
// DEMO - remove
for (int id = 0; id < worker_cnt; ++id) {
task_before_sync[id] = task[id];
}
// DEMO - till here
///////////////////
// unless we encounter an earier sync request from another worker
// (i.e. a slower one encounters an earlier sync while catching up)
// we do not have to do anything here, as everyone is now done the same
// n-th task already
// (See worker sync code from down below)
if (earliest_sync != -1) {
std::thread t[worker_cnt];
for (int i = 0; i < worker_cnt; ++i)
t[i] = std::thread(step_back, i, earliest_sync);
for (auto &i: t)
i.join();
}
//////////////////
// DEMO - remove
std::cout << std::endl << "Move workers to earliest sync task: " << earliest_sync << std::endl << std::endl;
std::cout << "id task_before task_now" << std::endl;
for (int i = 0; i < worker_cnt; ++i) {
std::cout << std::setw(2) << i
<< std::setw(12) << task_before_sync[i]
<< std::setw(9) << task[i]
<< std::endl;
}
// DEMO - till here
///////////////////
// at this point all worker are synced and waiting at the same task
do_sync();
// reset everything for next sync trigger
ready_for_sync.store(0);
sync.clear();
tgt_task.store(-1);
earliest_sync = -1;
// notify all workers to continue to run
cont.test_and_set(std::memory_order_acquire);
cont.notify_all();
//DEMO - debug print
std::cout << "\n----------\n" << std::endl;
}
}
// Another worker requests a sync
if (sync.test(std::memory_order_relaxed)) {
// make sure we wait until the target task number is set (i.e. tgt_task != -1)
tgt_task.wait(-1);
/////////////////
// DEMO - remove
int rando = std::rand()/((RAND_MAX + 1u)/100);
std::this_thread::sleep_for(std::chrono::milliseconds(rando));
// DEMO - till here
///////////////////
// Dependend on how many more or less tasks we executed,
// execute either additional tasks or rollback to an earlier task
// NOTE:
// When executing more tasks, we have to set earliest_sync_request[id] if we
// encounter a sync condition
// When rolling back, we do not have to care as there was no sync condition
// when executing the tasks the first time
auto tgt = tgt_task.load();
if (task[id] > tgt) {
step_back(id, tgt);
} else {
while (task[id] < tgt) {
step(id);
if (need_sync(id, 1)) {
earliest_sync_request[id] = task[id];
break;
}
}
}
// At this point we have moved to the, until now, correct target task and we wait.
// Once every worker did this, the sync requesting worker will be evaluated
// and every worker is moved to the earliest task that required a sync
ready_for_sync.fetch_add(1);
ready_for_sync.notify_all();
// At this point the sync requesting worker is done executing the earliest sync
// and every worker can continue to work
// NOTE:
// To allow dynamic task sync conditions, we only act on the earliest sync, as
// that might change future sync requests
cont.wait(false);
}
}
}
int main() {
std::srand(std::time(nullptr));
std::fill_n(earliest_sync_request, worker_cnt, -1);
std::fill_n(task, worker_cnt, 1);
std::thread t[worker_cnt];
int id = 0;
for (auto& i: t) {
i = std::thread(run, id++);
// DEMO - try to start tasks out of sync
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
for (auto &i: t)
i.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment