This is just a little cpp class to execute a function parallel with different parameters over and over again. It was developed to test some c++ features.
Compile: clang++-7 -Wall x.cpp -lpthread -std=c++17
| #include <iostream> | |
| #include <future> | |
| #include <vector> | |
| template <typename T> | |
| T calc(T a, T b) { | |
| T c = a + b; | |
| for (T i = 0; i < 1000000; i++) { | |
| c += b * i - 10; | |
| } | |
| return a + b + c; | |
| }; | |
| template <typename Result, typename... Parameters> | |
| class Executor { | |
| private: | |
| Result (*func)(Parameters...); | |
| std::vector<std::thread> threads; | |
| typedef std::unique_ptr<std::tuple< | |
| std::unique_ptr<std::promise<Result>>, | |
| std::unique_ptr<std::tuple<Parameters...>> | |
| >> vector_type; | |
| std::vector<vector_type> todo; | |
| std::mutex todo_mutex; | |
| std::condition_variable condition; | |
| bool data_ready = false; | |
| bool shutdown = false; | |
| void run() { | |
| std::cout << "run thread: " << std::this_thread::get_id() << std::endl; | |
| while (true) { | |
| vector_type t; | |
| { | |
| // Dont lock during function call | |
| std::unique_lock<std::mutex> uk(todo_mutex); | |
| condition.wait(uk, [this] { return data_ready; }); | |
| if (todo.empty()) { | |
| if (shutdown) { | |
| return; | |
| } | |
| data_ready = false; | |
| continue; | |
| } | |
| t = std::move(todo.back()); | |
| todo.pop_back(); | |
| } | |
| auto [prom, param] = std::move(*t); | |
| prom->set_value(std::apply(func, *param)); | |
| }; | |
| }; | |
| std::vector<std::unique_ptr<std::promise<Result>>> promises; | |
| public: | |
| Executor(size_t num_threads, Result (*f)(Parameters...)): func(f) { | |
| threads.reserve(num_threads); | |
| for (size_t i = 0; i < num_threads; i++) { | |
| threads.push_back(std::thread(&Executor::run, this)); | |
| } | |
| }; | |
| ~Executor() { | |
| { | |
| std::lock_guard<std::mutex> lk(todo_mutex); | |
| shutdown = true; | |
| data_ready = true; | |
| condition.notify_all(); | |
| } | |
| for (size_t i = 0; i < threads.size(); i++) { | |
| threads[i].join(); | |
| } | |
| }; | |
| std::future<Result> exec(Parameters... params) { | |
| auto prom = std::make_unique<std::promise<Result>>(); | |
| auto res = prom->get_future(); | |
| auto save = std::make_unique<std::tuple< | |
| std::unique_ptr<std::promise<Result>>, | |
| std::unique_ptr<std::tuple<Parameters...>>>>( | |
| std::make_tuple(std::move(prom), | |
| std::make_unique<std::tuple<Parameters...>>( | |
| std::make_tuple(params...)))); | |
| std::lock_guard<std::mutex> lk(todo_mutex); | |
| todo.push_back(std::move(save)); | |
| data_ready = true; | |
| condition.notify_one(); | |
| return res; | |
| } | |
| }; | |
| void fill(uint64_t runs, std::vector<std::future<float>>& results, Executor<float, float, float>& calcer) { | |
| results.reserve(runs); | |
| for (uint64_t i = 0; i < runs; i++) { | |
| results.push_back(calcer.exec(i, runs)); | |
| } | |
| } | |
| int main(int argc, char *argv[]) { | |
| Executor<float, float, float> calcer(std::thread::hardware_concurrency(), calc); | |
| std::cout << "main thread: " << std::this_thread::get_id() << std::endl; | |
| uint64_t runs = 10000; | |
| std::vector<std::future<float>> results1, results2; | |
| auto future1 = std::async([&] { | |
| fill(runs, results1, calcer); | |
| }); | |
| auto future2 = std::async([&] { | |
| fill(runs, results2, calcer); | |
| }); | |
| future1.get(); | |
| future2.get(); | |
| for (uint64_t i = 0; i < runs; i++) { | |
| results1[i].get(); | |
| results2[i].get(); | |
| } | |
| return 0; | |
| } |