Last active
April 17, 2022 13:05
-
-
Save yushangdi/6b6b46456953dd81d1b7dcf985570ccd to your computer and use it in GitHub Desktop.
scheduler-numa
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // EXAMPLE USE 1: | |
| // | |
| // fork_join_scheduler fj; | |
| // | |
| // long fib(long i) { | |
| // if (i <= 1) return 1; | |
| // long l,r; | |
| // fj.pardo([&] () { l = fib(i-1);}, | |
| // [&] () { r = fib(i-2);}); | |
| // return l + r; | |
| // } | |
| // | |
| // fib(40); | |
| // | |
| // EXAMPLE USE 2: | |
| // | |
| // void init(long* x, size_t n) { | |
| // parfor(0, n, [&] (int i) {a[i] = i;}); | |
| // } | |
| // | |
| #ifndef PARLAY_SCHEDULER_H_ | |
| #define PARLAY_SCHEDULER_H_ | |
| #include <cstdint> | |
| #include <cstdlib> | |
| #include <algorithm> | |
| #include <array> | |
| #include <atomic> | |
| #include <chrono> | |
| #include <iostream> | |
| #include <memory> | |
| #include <stdexcept> | |
| #include <string> | |
| #include <thread> | |
| #include <type_traits> // IWYU pragma: keep | |
| #include <vector> | |
| #include "internal/work_stealing_job.h" | |
| // #define SAGE | |
| #ifdef SAGE | |
| #include <pthread.h> | |
| #include <utmpx.h> | |
| #include <numa.h> | |
| #endif | |
| namespace parlay { | |
| inline bool cas(int* ptr, int oldv, int newv) { | |
| return __sync_bool_compare_and_swap(ptr, oldv, newv); | |
| } | |
| inline bool cas(size_t* ptr, size_t oldv, size_t newv) { | |
| return __sync_bool_compare_and_swap(ptr, oldv, newv); | |
| } | |
| // Deque from Arora, Blumofe, and Plaxton (SPAA, 1998). | |
| template <typename Job> | |
| struct Deque { | |
| using qidx = unsigned int; | |
| using tag_t = unsigned int; | |
| // use std::atomic<age_t> for atomic access. | |
| // Note: Explicit alignment specifier required | |
| // to ensure that Clang inlines atomic loads. | |
| struct alignas(int64_t) age_t { | |
| tag_t tag; | |
| qidx top; | |
| }; | |
| // align to avoid false sharing | |
| struct alignas(64) padded_job { | |
| std::atomic<Job*> job; | |
| }; | |
| static constexpr int q_size = 1000; | |
| std::atomic<qidx> bot; | |
| std::atomic<age_t> age; | |
| std::array<padded_job, q_size> deq; | |
| Deque() : bot(0), age(age_t{0, 0}) {} | |
| void push_bottom(Job* job) { | |
| auto local_bot = bot.load(std::memory_order_relaxed); // atomic load | |
| deq[local_bot].job.store(job, std::memory_order_relaxed); // shared store | |
| local_bot += 1; | |
| if (local_bot == q_size) { | |
| throw std::runtime_error("internal error: scheduler queue overflow"); | |
| } | |
| bot.store(local_bot, std::memory_order_relaxed); // shared store | |
| std::atomic_thread_fence(std::memory_order_seq_cst); | |
| } | |
| Job* pop_top() { | |
| Job* result = nullptr; | |
| auto old_age = age.load(std::memory_order_relaxed); // atomic load | |
| auto local_bot = bot.load(std::memory_order_relaxed); // atomic load | |
| if (local_bot > old_age.top) { | |
| auto job = | |
| deq[old_age.top].job.load(std::memory_order_relaxed); // atomic load | |
| auto new_age = old_age; | |
| new_age.top = new_age.top + 1; | |
| if (age.compare_exchange_strong(old_age, new_age)) | |
| result = job; | |
| else | |
| result = nullptr; | |
| } | |
| return result; | |
| } | |
| Job* pop_bottom() { | |
| Job* result = nullptr; | |
| auto local_bot = bot.load(std::memory_order_relaxed); // atomic load | |
| if (local_bot != 0) { | |
| local_bot--; | |
| bot.store(local_bot, std::memory_order_relaxed); // shared store | |
| std::atomic_thread_fence(std::memory_order_seq_cst); | |
| auto job = | |
| deq[local_bot].job.load(std::memory_order_relaxed); // atomic load | |
| auto old_age = age.load(std::memory_order_relaxed); // atomic load | |
| if (local_bot > old_age.top) | |
| result = job; | |
| else { | |
| bot.store(0, std::memory_order_relaxed); // shared store | |
| auto new_age = age_t{old_age.tag + 1, 0}; | |
| if ((local_bot == old_age.top) && | |
| age.compare_exchange_strong(old_age, new_age)) | |
| result = job; | |
| else { | |
| age.store(new_age, std::memory_order_relaxed); // shared store | |
| result = nullptr; | |
| } | |
| std::atomic_thread_fence(std::memory_order_seq_cst); | |
| } | |
| } | |
| return result; | |
| } | |
| }; | |
| template <typename Job> | |
| struct scheduler { | |
| public: | |
| // see comments under wait(..) | |
| static bool const conservative = false; | |
| unsigned int num_threads; | |
| static thread_local unsigned int thread_id; | |
| #ifdef SAGE | |
| static thread_local int numa_node; | |
| uint32_t num_numa_node; | |
| int thread_per_node; // assume all numa nodes have the same number of threads | |
| #endif | |
| scheduler() | |
| : num_threads(init_num_workers()), | |
| num_deques(num_threads), //changed! removed * 2 | |
| deques(num_deques), | |
| attempts(num_deques), | |
| spawned_threads(), | |
| finished_flag(false) { | |
| // Stopping condition | |
| auto finished = [this]() { | |
| return finished_flag.load(std::memory_order_relaxed); | |
| }; | |
| // Spawn num_threads many threads on startup | |
| thread_id = 0; // thread-local write | |
| #ifdef SAGE | |
| num_numa_node = 1+numa_max_node(); | |
| thread_per_node = num_threads/num_numa_node; | |
| // std::cout <<"from scheduler: " << num_numa_node << " " << thread_per_node << " " << num_threads << std::endl; | |
| mailboxes = std::vector<Job *>(num_threads); | |
| numa_nodes = std::vector<int>(num_threads); | |
| sorted_threads = std::vector<int>(num_threads); | |
| numa_node = numa_node_of_cpu(0); | |
| numa_nodes[0] = numa_node; | |
| sorted_threads[0] = 0; | |
| for (unsigned int i = 1; i < num_threads; i++) { | |
| sorted_threads[i] = i; | |
| mailboxes[i] = nullptr; | |
| spawned_threads.emplace_back([&, i, finished]() { | |
| thread_id = i; // thread-local write | |
| numa_node = numa_node_of_cpu(i); | |
| // std::cout << "numa_node: " << numa_node << std::endl; | |
| numa_nodes[i] = numa_node; | |
| start(finished); | |
| }); | |
| cpu_set_t cpuset; | |
| CPU_ZERO(&cpuset); | |
| CPU_SET(i, &cpuset); | |
| pthread_setaffinity_np(spawned_threads[i-1].native_handle(), sizeof(cpu_set_t), &cpuset); | |
| } | |
| { | |
| cpu_set_t cpuset; | |
| CPU_ZERO(&cpuset); | |
| CPU_SET(0, &cpuset); | |
| pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); | |
| } | |
| struct customLess { | |
| int* numa_nodes; | |
| customLess(int *v):numa_nodes(v){} | |
| bool operator()(int a, int b) const { return numa_nodes[a] < numa_nodes[b]; } | |
| } ; | |
| std::sort(sorted_threads.begin(), sorted_threads.end(), customLess(numa_nodes.data())); //why reached here twice? | |
| for(int i=0; i < num_deques; ++i){ | |
| std::cout << sorted_threads[i] << " " << numa_nodes[sorted_threads[i]] << std::endl; | |
| } | |
| sort_done = true; | |
| } | |
| #else | |
| for (unsigned int i = 1; i < num_threads; i++) { | |
| spawned_threads.emplace_back([&, i, finished]() { | |
| thread_id = i; // thread-local write | |
| start(finished); | |
| }); | |
| } | |
| } | |
| #endif | |
| #ifdef SAGE | |
| int numanode() { | |
| return numa_node; | |
| } | |
| void send(Job* job, int id) { | |
| bool succ = cas((size_t*)(&(mailboxes[id])), (size_t)nullptr, (size_t)job); | |
| std::cout << "sent job: " << succ << std::endl; | |
| } | |
| #endif | |
| ~scheduler() { | |
| finished_flag.store(true, std::memory_order_relaxed); | |
| for (unsigned int i = 1; i < num_threads; i++) { | |
| spawned_threads[i - 1].join(); | |
| } | |
| } | |
| // Push onto local stack. | |
| void spawn(Job* job) { | |
| int id = worker_id(); | |
| deques[id].push_bottom(job); | |
| } | |
| // Wait for condition: finished(). | |
| template <typename F> | |
| void wait(F finished, bool conservative = false) { | |
| // Conservative avoids deadlock if scheduler is used in conjunction | |
| // with user locks enclosing a wait. | |
| if (conservative) { | |
| while (!finished()) std::this_thread::yield(); | |
| } | |
| // If not conservative, schedule within the wait. | |
| // Can deadlock if a stolen job uses same lock as encloses the wait. | |
| else | |
| start(finished); | |
| } | |
| // All scheduler threads quit after this is called. | |
| void finish() { finished_flag.store(true, std::memory_order_relaxed); } | |
| #ifdef SAGE | |
| // Pop from local stack. | |
| Job* try_pop() { | |
| int id = worker_id(); | |
| Job* job = deques[id].pop_bottom(); | |
| if (job == nullptr) { | |
| if (mailboxes[id]) { | |
| job = mailboxes[id]; | |
| cas((size_t*)(&(mailboxes[id])), (size_t)job, (size_t)nullptr); | |
| } | |
| } | |
| return job; | |
| } | |
| #else | |
| // Pop from local stack. | |
| Job* try_pop() { | |
| auto id = worker_id(); | |
| return deques[id].pop_bottom(); | |
| } | |
| #endif | |
| #ifdef _MSC_VER | |
| #pragma warning(push) | |
| #pragma warning(disable : 4996) // 'getenv': This function or variable may be unsafe. | |
| #endif | |
| // Determine the number of workers to spawn | |
| unsigned int init_num_workers() { | |
| if (const auto env_p = std::getenv("PARLAY_NUM_THREADS")) { | |
| return std::stoi(env_p); | |
| } else { | |
| return std::thread::hardware_concurrency(); | |
| } | |
| } | |
| #ifdef _MSC_VER | |
| #pragma warning(pop) | |
| #endif | |
| unsigned int num_workers() { return num_threads; } | |
| unsigned int worker_id() { return thread_id; } | |
| void set_num_workers(unsigned int) { | |
| std::cout << "Unsupported" << std::endl; | |
| exit(-1); | |
| } | |
| private: | |
| // Align to avoid false sharing. | |
| struct alignas(128) attempt { | |
| size_t val; | |
| }; | |
| int num_deques; | |
| std::vector<Deque<Job>> deques; | |
| std::vector<attempt> attempts; | |
| std::vector<std::thread> spawned_threads; | |
| std::atomic<int> finished_flag; | |
| #ifdef SAGE | |
| std::vector<Job*> mailboxes; | |
| std::vector<int> numa_nodes; | |
| std::vector<int> sorted_threads; //sorted by numa_node | |
| bool sort_done = false; | |
| #endif | |
| // Start an individual scheduler task. Runs until finished(). | |
| template <typename F> | |
| void start(F finished) { | |
| while (true) { | |
| Job* job = get_job(finished); | |
| if (!job) return; | |
| (*job)(); | |
| } | |
| } | |
| #ifdef SAGE | |
| // assume all numa nodes have the same number of threads | |
| // only steal from threads in the same numa node | |
| Job* try_steal(size_t id) { | |
| // use hashing to get "random" target | |
| size_t target = (hash(id) + hash(attempts[id].val)) % (num_deques / num_numa_node); | |
| target = sorted_threads[numa_nodes[id]*num_deques / num_numa_node + target]; | |
| attempts[id].val++; | |
| // // if(id!=thread_id){ | |
| // // std::cout << thread_id << " " << id << std::endl; | |
| // // } | |
| // if(sort_done && numa_nodes[id]!= numa_nodes[target]){ | |
| // std::cout << thread_id << " " << id << " steals " <<target << std::endl; | |
| // std::cout << numa_nodes[id] <<numa_nodes[target] << std::endl; | |
| // } | |
| // size_t target = 0; | |
| return deques[target].pop_top(); | |
| } | |
| #else | |
| Job* try_steal(size_t id) { | |
| // use hashing to get "random" target | |
| size_t target = (hash(id) + hash(attempts[id].val)) % num_deques; | |
| attempts[id].val++; | |
| return deques[target].pop_top(); | |
| } | |
| #endif | |
| // Find a job, first trying local stack, then random steals. | |
| template <typename F> | |
| Job* get_job(F finished) { | |
| if (finished()) return nullptr; | |
| Job* job = try_pop(); | |
| if (job) return job; | |
| size_t id = worker_id(); | |
| while (true) { | |
| job = try_pop(); | |
| if (job) return job; //catch mailbox jobs, ok to add this? | |
| // By coupon collector's problem, this should touch all. | |
| for (int i = 0; i <= num_deques * 100; i++) { | |
| if (finished()) return nullptr; | |
| job = try_steal(id); | |
| if (job) return job; | |
| } | |
| // If haven't found anything, take a breather. | |
| std::this_thread::sleep_for(std::chrono::nanoseconds(num_deques * 100)); | |
| } | |
| } | |
| size_t hash(uint64_t x) { | |
| x = (x ^ (x >> 30)) * 0xbf58476d1ce4e5b9ULL; | |
| x = (x ^ (x >> 27)) * 0x94d049bb133111ebULL; | |
| x = x ^ (x >> 31); | |
| return static_cast<size_t>(x); | |
| } | |
| }; | |
| template <typename T> | |
| thread_local unsigned int scheduler<T>::thread_id = 0; | |
| #ifdef SAGE | |
| template<typename T> | |
| thread_local int scheduler<T>::numa_node = 0; | |
| #endif | |
| class fork_join_scheduler { | |
| using Job = WorkStealingJob; | |
| // Underlying scheduler object | |
| std::unique_ptr<scheduler<Job>> sched; | |
| public: | |
| fork_join_scheduler() : sched(std::make_unique<scheduler<Job>>()) {} | |
| unsigned int num_workers() { return sched->num_workers(); } | |
| unsigned int worker_id() { return sched->worker_id(); } | |
| void set_num_workers(int n) { sched->set_num_workers(n); } | |
| #ifdef SAGE | |
| int numanode() { return sched->numanode(); } | |
| void send(Job* job, int id) { sched->send(job, id);} | |
| #endif | |
| // Fork two thunks and wait until they both finish. | |
| template <typename L, typename R> | |
| void pardo(L left, R right, bool conservative = false) { | |
| auto right_job = make_job(right); | |
| sched->spawn(&right_job); | |
| left(); | |
| if (sched->try_pop() != nullptr) | |
| right(); | |
| else { | |
| auto finished = [&]() { return right_job.finished(); }; | |
| sched->wait(finished, conservative); | |
| } | |
| } | |
| #ifdef _MSC_VER | |
| #pragma warning(push) | |
| #pragma warning(disable: 4267) // conversion from 'size_t' to *, possible loss of data | |
| #endif | |
| template <typename F> | |
| size_t get_granularity(size_t start, size_t end, F f) { | |
| size_t done = 0; | |
| size_t sz = 1; | |
| int ticks = 0; | |
| do { | |
| sz = std::min(sz, end - (start + done)); | |
| auto tstart = std::chrono::high_resolution_clock::now(); | |
| for (size_t i = 0; i < sz; i++) f(start + done + i); | |
| auto tstop = std::chrono::high_resolution_clock::now(); | |
| ticks = static_cast<int>((tstop - tstart).count()); | |
| done += sz; | |
| sz *= 2; | |
| } while (ticks < 1000 && done < (end - start)); | |
| return done; | |
| } | |
| template <typename F> | |
| void parfor(size_t start, size_t end, F f, size_t granularity = 0, | |
| bool conservative = false) { | |
| if (end <= start) return; | |
| if (granularity == 0) { | |
| size_t done = get_granularity(start, end, f); | |
| granularity = std::max(done, (end - start) / (128 * sched->num_threads)); | |
| parfor_(start + done, end, f, granularity, conservative); | |
| } else | |
| parfor_(start, end, f, granularity, conservative); | |
| } | |
| private: | |
| template <typename F> | |
| void parfor_(size_t start, size_t end, F f, size_t granularity, | |
| bool conservative) { | |
| if ((end - start) <= granularity) | |
| for (size_t i = start; i < end; i++) f(i); | |
| else { | |
| size_t n = end - start; | |
| // Not in middle to avoid clashes on set-associative | |
| // caches on powers of 2. | |
| size_t mid = (start + (9 * (n + 1)) / 16); | |
| pardo([&]() { parfor_(start, mid, f, granularity, conservative); }, | |
| [&]() { parfor_(mid, end, f, granularity, conservative); }, | |
| conservative); | |
| } | |
| } | |
| #ifdef _MSC_VER | |
| #pragma warning(pop) | |
| #endif | |
| }; | |
| } // namespace parlay | |
| #endif // PARLAY_SCHEDULER_H_ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment