[dawn][native] Add a Priority value for serial related tasks. - The serial related tasks may be ran on different threads in future changes, so adding the priority value for a lot of these tasks will allow us to process only critical tasks when dealing with user related tasks while still providing a way to make forward progress on other less urgent tasks such as deallocation/cleanup. Change-Id: I7f6a9a10d87d88f3cc412ef8b354bd704d55af08 Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/293797 Reviewed-by: Brandon Jones <bajones@chromium.org> Commit-Queue: Loko Kung <lokokung@google.com>
diff --git a/src/dawn/native/Device.cpp b/src/dawn/native/Device.cpp index 88edf6c..ef8fc6c 100644 --- a/src/dawn/native/Device.cpp +++ b/src/dawn/native/Device.cpp
@@ -1700,7 +1700,7 @@ // To avoid overly ticking, we only want to tick when: // 1. the last submitted serial has moved beyond the completed serial // 2. or the backend still has pending commands to submit. - DAWN_TRY(mQueue->UpdateCompletedSerial()); + DAWN_TRY(mQueue->UpdateCompletedSerial(QueuePriority::Lowest)); DAWN_TRY(TickImpl()); // TODO(crbug.com/dawn/833): decouple TickImpl from updating the serial so that we can
diff --git a/src/dawn/native/EventManager.cpp b/src/dawn/native/EventManager.cpp index 8f8346a..a46c3ba 100644 --- a/src/dawn/native/EventManager.cpp +++ b/src/dawn/native/EventManager.cpp
@@ -315,16 +315,18 @@ if (const auto* queueAndSerial = event->GetIfQueueAndSerial()) { if (auto q = queueAndSerial->queue.Promote()) { - q->TrackSerialTask(queueAndSerial->completionSerial, [this, event]() { - // If this is executed, we can be sure that the raw pointer to this EventManager is - // valid because the task is ran by the Queue and: - // Queue -[refs]-> - // Device -[refs]-> - // Adapter -[refs]-> - // Instance -[owns]-> - // EventManager. - SetFutureReady(event.Get()); - }); + q->TrackSerialTask(QueuePriority::UserVisible, queueAndSerial->completionSerial, + [this, event]() { + // If this is executed, we can be sure that the raw pointer to + // this EventManager is valid because the task is ran by the + // Queue and: + // Queue -[refs]-> + // Device -[refs]-> + // Adapter -[refs]-> + // Instance -[owns]-> + // EventManager. + SetFutureReady(event.Get()); + }); } }
diff --git a/src/dawn/native/ExecutionQueue.cpp b/src/dawn/native/ExecutionQueue.cpp index ebcec32..b9129aa 100644 --- a/src/dawn/native/ExecutionQueue.cpp +++ b/src/dawn/native/ExecutionQueue.cpp
@@ -47,8 +47,16 @@ } } // namespace +inline QueuePriority& operator-=(QueuePriority& priority, size_t i) { + DAWN_ASSERT(static_cast<size_t>(priority) >= i); + priority = static_cast<QueuePriority>(static_cast<size_t>(priority) - i); + return priority; +} + ExecutionQueueBase::~ExecutionQueueBase() { - DAWN_ASSERT(mState->mWaitingTasks.Empty()); + for (QueuePriority p = QueuePriority::Highest; p >= QueuePriority::Lowest; p -= 1) { + DAWN_ASSERT(mState->mWaitingTasks[p].Empty()); + } } ExecutionSerial ExecutionQueueBase::GetPendingCommandSerial() const { @@ -67,7 +75,7 @@ // Serial is already complete. if (waitSerial <= GetCompletedCommandSerial()) { // Ensure that all tasks related to the serial have been triggered. - UpdateCompletedSerialTo(waitSerial); + UpdateCompletedSerialTo(QueuePriority::UserVisible, waitSerial); return {}; } @@ -89,10 +97,10 @@ // Wait on the serial if it hasn't passed yet. ExecutionSerial completedSerial = kWaitSerialTimeout; DAWN_TRY_ASSIGN(completedSerial, WaitForQueueSerialImpl(waitSerial, timeout)); - UpdateCompletedSerialTo(completedSerial); + UpdateCompletedSerialTo(QueuePriority::UserVisible, completedSerial); return {}; } - return UpdateCompletedSerial(); + return UpdateCompletedSerial(QueuePriority::UserVisible); } else { // Otherwise, we need to acquire the device lock first. auto deviceGuard = GetDevice()->GetGuard(); @@ -114,7 +122,7 @@ // stale data. FetchMax(mCompletedSerial, uint64_t(completedSerial)); } - return UpdateCompletedSerial(); + return UpdateCompletedSerial(QueuePriority::UserVisible); } } @@ -129,7 +137,7 @@ IgnoreErrors(WaitForIdleForDestructionImpl()); // Prepare to call any remaining outstanding callbacks now. - std::vector<Ref<SerialProcessor>>* processors = nullptr; + QueuePriorityArray<std::vector<Ref<SerialProcessor>>>* processors = nullptr; std::vector<Task> tasks; ExecutionSerial serial; @@ -137,8 +145,6 @@ // Wait until we can exclusively call callbacks. state.Wait([](auto& x) { return !x.mCallingCallbacks; }); - processors = &state->mWaitingProcessors; - // We finish tasks all the way up to the pending command serial because otherwise, pending // tasks that may be for cleanup won't every be completed. Also, for |buffer.MapAsync|, a // lot of backends queue up a clear to initialize the data on those buffers and that clear @@ -147,17 +153,21 @@ // pending command is ever submitted, the map async task will be left dangling if we only // clear up to the completed serial. serial = GetPendingCommandSerial(); - PopWaitingTasksInto(serial, state->mWaitingTasks, tasks); - if (!processors->empty() || !tasks.empty()) { - state->mCallingCallbacks = true; + // Call all callbacks that for all priorities. + processors = &state->mWaitingProcessors; + for (QueuePriority p = QueuePriority::Highest; p >= QueuePriority::Lowest; p -= 1) { + PopWaitingTasksInto(serial, state->mWaitingTasks[p], tasks); } + state->mCallingCallbacks = true; }); // Always call the processors before processing individual tasks. DAWN_ASSERT(processors); - for (auto& processor : *processors) { - processor->UpdateCompletedSerialTo(serial); + for (QueuePriority p = QueuePriority::Highest; p >= QueuePriority::Lowest; p -= 1) { + for (auto& processor : (*processors)[p]) { + processor->UpdateCompletedSerialTo(serial); + } } // Call the callbacks without holding the lock on the ExecutionQueue to avoid lock-inversion @@ -166,9 +176,7 @@ task(); } - if (!processors->empty() || !tasks.empty()) { - mState->mCallingCallbacks = false; - } + mState->mCallingCallbacks = false; return {}; } @@ -184,30 +192,34 @@ return {}; } -MaybeError ExecutionQueueBase::UpdateCompletedSerial() { +MaybeError ExecutionQueueBase::UpdateCompletedSerial(QueuePriority priority) { ExecutionSerial completedSerial; DAWN_TRY_ASSIGN(completedSerial, CheckAndUpdateCompletedSerials()); DAWN_ASSERT(completedSerial <= ExecutionSerial(mLastSubmittedSerial.load(std::memory_order_acquire))); - UpdateCompletedSerialTo(completedSerial); + UpdateCompletedSerialTo(priority, completedSerial); return {}; } -void ExecutionQueueBase::RegisterSerialProcessor(Ref<SerialProcessor>&& serialProcessor) { +void ExecutionQueueBase::RegisterSerialProcessor(QueuePriority priority, + Ref<SerialProcessor>&& serialProcessor) { // Serial processor registration should always happen at queue initialization. DAWN_ASSERT(mCompletedSerial == static_cast<uint64_t>(kBeginningOfGPUTime)); - mState.Use<NotifyType::None>( - [&](auto state) { state->mWaitingProcessors.push_back(std::move(serialProcessor)); }); + mState.Use<NotifyType::None>([&](auto state) { + state->mWaitingProcessors[priority].push_back(std::move(serialProcessor)); + }); } // Tasks may execute synchronously if the given serial has already passed or during device // destruction. As a result, callers should ensure that the calling thread releases any locks that // will be taken by the task prior to calling TrackSerialTask. -void ExecutionQueueBase::TrackSerialTask(ExecutionSerial serial, Task&& task) { +void ExecutionQueueBase::TrackSerialTask(QueuePriority priority, + ExecutionSerial serial, + Task&& task) { bool tracked = mState.Use<NotifyType::None>([&](auto state) { if (!state->mAssumeCompleted && serial > GetCompletedCommandSerial()) { - state->mWaitingTasks.Enqueue(std::move(task), serial); + state->mWaitingTasks[priority].Enqueue(std::move(task), serial); return true; } return false; @@ -217,13 +229,15 @@ } } -void ExecutionQueueBase::UpdateCompletedSerialTo(ExecutionSerial completedSerial) { - UpdateCompletedSerialToInternal(completedSerial); +void ExecutionQueueBase::UpdateCompletedSerialTo(QueuePriority priority, + ExecutionSerial completedSerial) { + UpdateCompletedSerialToInternal(priority, completedSerial); } -void ExecutionQueueBase::UpdateCompletedSerialToInternal(ExecutionSerial completedSerial, +void ExecutionQueueBase::UpdateCompletedSerialToInternal(QueuePriority priority, + ExecutionSerial completedSerial, bool forceTasks) { - std::vector<Ref<SerialProcessor>>* processors = nullptr; + QueuePriorityArray<std::vector<Ref<SerialProcessor>>>* processors = nullptr; std::vector<Task> tasks; ExecutionSerial serial = completedSerial; @@ -246,20 +260,22 @@ // Wait until we can exclusively call callbacks. state.Wait([](auto& x) { return !x.mCallingCallbacks; }); - - processors = &state->mWaitingProcessors; - serial = GetCompletedCommandSerial(); - PopWaitingTasksInto(serial, state->mWaitingTasks, tasks); - if (!processors->empty() || !tasks.empty()) { - state->mCallingCallbacks = true; + + // Call all callbacks that for the given priority and anything of higher priority as well. + processors = &state->mWaitingProcessors; + for (QueuePriority p = QueuePriority::Highest; p >= priority; p -= 1) { + PopWaitingTasksInto(serial, state->mWaitingTasks[p], tasks); } + state->mCallingCallbacks = true; }); // Always call the processors before processing individual tasks. if (processors) { - for (auto& processor : *processors) { - processor->UpdateCompletedSerialTo(serial); + for (QueuePriority p = QueuePriority::Highest; p >= priority; p -= 1) { + for (auto& processor : (*processors)[p]) { + processor->UpdateCompletedSerialTo(serial); + } } } @@ -269,9 +285,7 @@ task(); } - if ((processors && !processors->empty()) || !tasks.empty()) { - mState->mCallingCallbacks = false; - } + mState->mCallingCallbacks = false; } MaybeError ExecutionQueueBase::EnsureCommandsFlushed(ExecutionSerial serial) { @@ -298,19 +312,17 @@ ExecutionSerial(mLastSubmittedSerial.fetch_add(1u, std::memory_order_release) + 1); // Force any waiting tasks to execute. This will ensure that any tasks that were scheduled // after WaitForIdleForDestruction being called are completed. - UpdateCompletedSerialToInternal(completed, true); + UpdateCompletedSerialToInternal(QueuePriority::Lowest, completed, true); // Update all the processors to let them know that they should assume commands are complete, // then release our reference to them. std::vector<Ref<SerialProcessor>> processors; mState.Use<NotifyType::None>([&](auto state) { - // Since we don't hold the lock while accessing the list of processes and instead rely on - // |mCallingCallbacks|, we need to wait until we are no longer calling callbacks before - // resetting the vector. - state.Wait([](auto& x) { return !x.mCallingCallbacks; }); - - processors = std::move(state->mWaitingProcessors); - state->mWaitingProcessors.clear(); + for (QueuePriority p = QueuePriority::Highest; p >= QueuePriority::Lowest; p -= 1) { + processors.insert(processors.end(), state->mWaitingProcessors[p].begin(), + state->mWaitingProcessors[p].end()); + state->mWaitingProcessors[p].clear(); + } }); for (auto& processor : processors) { processor->AssumeCommandsComplete();
diff --git a/src/dawn/native/ExecutionQueue.h b/src/dawn/native/ExecutionQueue.h index 0c92860..165ee86 100644 --- a/src/dawn/native/ExecutionQueue.h +++ b/src/dawn/native/ExecutionQueue.h
@@ -38,6 +38,7 @@ #include "dawn/common/RefCounted.h" #include "dawn/common/SerialMap.h" #include "dawn/common/Time.h" +#include "dawn/common/ityp_array.h" #include "dawn/native/Error.h" #include "dawn/native/IntegerTypes.h" #include "dawn/native/ObjectBase.h" @@ -45,6 +46,26 @@ namespace dawn::native { +// Queue task priority can be specified such that when user facing code is called, i.e. |WaitAny|, +// we can do the minimum amount of work to remain responsive. For other classes of priority, we may +// defer and execute them either on another thread if full spontaneous is enabled, or when +// |Tick| is called in the meantime. Higher numeric values correspond to higher priority. Note that +// this is loosely based off of TaskPriority in Chromium: +// https://source.chromium.org/chromium/chromium/src/+/main:base/task/task_traits.h +// Note that when the queue is asked to process tasks of a given priorty, it will process any tasks +// of that priority or higher. +enum class QueuePriority : size_t { + // This will always be the lowest priority available. We use 1 instead of 0 because we implement + // a decrement operator to iterate through the priorities and by reserving 0, we can assert that + // we don't underflow. + Lowest = 1, + + BestEffort = Lowest, + UserVisible, + + Highest = UserVisible, +}; + // Represents an engine which processes a stream of GPU work. It handles the tracking and // update of the various ExecutionSerials related to that work. // TODO(dawn:831, dawn:1413): Make usage of the ExecutionQueue thread-safe. Right now it is @@ -87,7 +108,7 @@ // should not be used anymore. // TODO(crbug.com/42240396): Remove |CheckPassedSerials| in favor of |UpdateCompletedSerial|. MaybeError CheckPassedSerials(); - MaybeError UpdateCompletedSerial(); + MaybeError UpdateCompletedSerial(QueuePriority priority); // For the commands being internally recorded in backend, that were not urgent to submit, this // method makes them to be submitted as soon as possible in next ticks. @@ -117,10 +138,10 @@ // Registers a SerialProcessor that will be notified of serial updates. Note that serial // processors are always notified before handling serial tasks. - void RegisterSerialProcessor(Ref<SerialProcessor>&& serialProcessor); + void RegisterSerialProcessor(QueuePriority priority, Ref<SerialProcessor>&& serialProcessor); // Tracks new tasks to complete when |serial| is reached. - void TrackSerialTask(ExecutionSerial serial, Task&& task); + void TrackSerialTask(QueuePriority priority, ExecutionSerial serial, Task&& task); // In the 'Normal' mode, currently recorded commands in the backend submitted in the next Tick. // However in the 'Passive' mode, the submission will be postponed as late as possible, for @@ -138,7 +159,7 @@ // calls into the backend specific polling mechanisms implemented in // CheckAndUpdateCompletedSerials. Alternatively, the backend can actively call // UpdateCompletedSerialTo when a new serial is complete to make forward progress proactively. - void UpdateCompletedSerialTo(ExecutionSerial completedSerial); + void UpdateCompletedSerialTo(QueuePriority priority, ExecutionSerial completedSerial); private: // Each backend should implement to check their passed fences if there are any and return a @@ -159,7 +180,9 @@ // Submit any pending commands that are enqueued. virtual MaybeError SubmitPendingCommandsImpl() = 0; - void UpdateCompletedSerialToInternal(ExecutionSerial completedSerial, bool forceTasks = false); + void UpdateCompletedSerialToInternal(QueuePriority priority, + ExecutionSerial completedSerial, + bool forceTasks = false); // |mCompletedSerial| tracks the last completed command serial that the fence has returned. This // is currently implicitly guarded by the lock for |mWaitingTasks| since we only update this @@ -175,12 +198,16 @@ // 1) Callback ordering is guaranteed. // 2) Re-entrant callbacks do not cause lock-inversion issues w.r.t this lock and the // device lock. + template <typename T> + using QueuePriorityArray = + ityp::array<QueuePriority, T, static_cast<size_t>(QueuePriority::Highest) + 1>; + struct State { bool mCallingCallbacks = false; bool mWaitingForIdle = false; bool mAssumeCompleted = false; - std::vector<Ref<SerialProcessor>> mWaitingProcessors; - SerialMap<ExecutionSerial, Task> mWaitingTasks; + QueuePriorityArray<std::vector<Ref<SerialProcessor>>> mWaitingProcessors; + QueuePriorityArray<SerialMap<ExecutionSerial, Task>> mWaitingTasks; }; MutexCondVarProtected<State> mState; };
diff --git a/src/dawn/native/metal/QueueMTL.mm b/src/dawn/native/metal/QueueMTL.mm index 4899ddb..cafb552 100644 --- a/src/dawn/native/metal/QueueMTL.mm +++ b/src/dawn/native/metal/QueueMTL.mm
@@ -251,7 +251,7 @@ TRACE_EVENT_ASYNC_END0(platform, GPUWork, "DeviceMTL::SubmitPendingCommandBuffer", uint64_t(pendingSerial)); - this->UpdateCompletedSerialTo(pendingSerial); + this->UpdateCompletedSerialTo(QueuePriority::Lowest, pendingSerial); this->UpdateCommandsCompletedEvents(pendingSerial); }];
diff --git a/src/dawn/native/vulkan/DeviceVk.cpp b/src/dawn/native/vulkan/DeviceVk.cpp index ca0a349..c4e7aa7 100644 --- a/src/dawn/native/vulkan/DeviceVk.cpp +++ b/src/dawn/native/vulkan/DeviceVk.cpp
@@ -187,7 +187,7 @@ Ref<Queue> queue; DAWN_TRY_ASSIGN(queue, Queue::Create(this, &descriptor->defaultQueue, mMainQueueFamily)); - queue->RegisterSerialProcessor(mDeleter); + queue->RegisterSerialProcessor(QueuePriority::BestEffort, mDeleter); if (HasFeature(Feature::ChromiumExperimentalSamplingResourceTable)) { DAWN_TRY_ASSIGN(mResourceTableLayout, ResourceTable::MakeDescriptorSetLayout(this));
diff --git a/src/dawn/native/vulkan/QueueVk.cpp b/src/dawn/native/vulkan/QueueVk.cpp index f9e2c6a..512a28f 100644 --- a/src/dawn/native/vulkan/QueueVk.cpp +++ b/src/dawn/native/vulkan/QueueVk.cpp
@@ -344,7 +344,7 @@ for (size_t i = 0; i < mRecordingContext.commandBufferList.size(); ++i) { CommandPoolAndBuffer commands = {mRecordingContext.commandPoolList[i], mRecordingContext.commandBufferList[i]}; - TrackSerialTask(lastSubmittedSerial, [commands, this]() { + TrackSerialTask(QueuePriority::UserVisible, lastSubmittedSerial, [commands, this]() { Device* device = ToBackend(GetDevice()); VkDevice vkDevice = device->GetVkDevice();