Viewing file: TaskQueue.h (4.23 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// // // This file defines a crude C++11 based task queue. // //===----------------------------------------------------------------------===//
#ifndef LLVM_SUPPORT_TASKQUEUE_H #define LLVM_SUPPORT_TASKQUEUE_H
#include "llvm/Config/llvm-config.h" #include "llvm/Support/ThreadPool.h" #include "llvm/Support/thread.h"
#include <atomic> #include <cassert> #include <condition_variable> #include <deque> #include <functional> #include <future> #include <memory> #include <mutex> #include <utility>
namespace llvm { /// TaskQueue executes serialized work on a user-defined Thread Pool. It /// guarantees that if task B is enqueued after task A, task B begins after /// task A completes and there is no overlap between the two. class TaskQueue { // Because we don't have init capture to use move-only local variables that // are captured into a lambda, we create the promise inside an explicit // callable struct. We want to do as much of the wrapping in the // type-specialized domain (before type erasure) and then erase this into a // std::function. template <typename Callable> struct Task { using ResultTy = std::invoke_result_t<Callable>; explicit Task(Callable C, TaskQueue &Parent) : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), Parent(&Parent) {}
template<typename T> void invokeCallbackAndSetPromise(T*) { P->set_value(C()); }
void invokeCallbackAndSetPromise(void*) { C(); P->set_value(); }
void operator()() noexcept { ResultTy *Dummy = nullptr; invokeCallbackAndSetPromise(Dummy); Parent->completeTask(); }
Callable C; std::shared_ptr<std::promise<ResultTy>> P; TaskQueue *Parent; };
public: /// Construct a task queue with no work. TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
/// Blocking destructor: the queue will wait for all work to complete. ~TaskQueue() { Scheduler.wait(); assert(Tasks.empty()); }
/// Asynchronous submission of a task to the queue. The returned future can be /// used to wait for the task (and all previous tasks that have not yet /// completed) to finish. template <typename Callable> std::future<std::invoke_result_t<Callable>> async(Callable &&C) { #if !LLVM_ENABLE_THREADS static_assert(false, "TaskQueue requires building with LLVM_ENABLE_THREADS!"); #endif Task<Callable> T{std::move(C), *this}; using ResultTy = std::invoke_result_t<Callable>; std::future<ResultTy> F = T.P->get_future(); { std::lock_guard<std::mutex> Lock(QueueLock); // If there's already a task in flight, just queue this one up. If // there is not a task in flight, bypass the queue and schedule this // task immediately. if (IsTaskInFlight) Tasks.push_back(std::move(T)); else { Scheduler.async(std::move(T)); IsTaskInFlight = true; } } return F; }
private: void completeTask() { // We just completed a task. If there are no more tasks in the queue, // update IsTaskInFlight to false and stop doing work. Otherwise // schedule the next task (while not holding the lock). std::function<void()> Continuation; { std::lock_guard<std::mutex> Lock(QueueLock); if (Tasks.empty()) { IsTaskInFlight = false; return; }
Continuation = std::move(Tasks.front()); Tasks.pop_front(); } Scheduler.async(std::move(Continuation)); }
/// The thread pool on which to run the work. ThreadPool &Scheduler;
/// State which indicates whether the queue currently is currently processing /// any work. bool IsTaskInFlight = false;
/// Mutex for synchronizing access to the Tasks array. std::mutex QueueLock;
/// Tasks waiting for execution in the queue. std::deque<std::function<void()>> Tasks; }; } // namespace llvm
#endif // LLVM_SUPPORT_TASKQUEUE_H
|