[dawn][native] Adds task tracking in ExecutionQueueBase. - Implements the necessary tools in ExecutionQueueBase as described in the [design doc](https://docs.google.com/document/d/1o_1SvkpnWq0ysOxU5jJLkWKjBrLG5eUHqmWQSjQiu2s/edit?usp=sharing). - Uses WeakRefs for Queue based events to avoid ref-cycle of QueueBase -[implements]-> ExecutionQueueBase -[owns]-> Task -[refs]-> TrackedEvent -[refs]-> QueueBase - Much of the task tracking is based on the CallbackTaskManager which we can hopefully remove once more refactoring is done. - Introduces a mechanism for the backend to proactively update the completed serial while also tracking queue event futures. This way, when the backend notifies that a particular serial is passed, we can flush all future callbacks up to that serial. - Ensure that once the instance is dropped, all non-spontaneous event callbacks are called. - Disables some Spontaneous tests since they won't work anymore now that we are in the middle of implementing the infra to enable AllowSpontaneous. Bug: 412761228 Change-Id: I1650ab2ef1fa60837553d7bd3a5f6e0b4e7ad3b7 Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/239115 Reviewed-by: Corentin Wallez <cwallez@chromium.org> Reviewed-by: Kai Ninomiya <kainino@chromium.org> Commit-Queue: Loko Kung <lokokung@google.com>
diff --git a/src/dawn/native/Buffer.cpp b/src/dawn/native/Buffer.cpp index 092ab2e..946e5a8 100644 --- a/src/dawn/native/Buffer.cpp +++ b/src/dawn/native/Buffer.cpp
@@ -225,9 +225,11 @@ void Complete(EventCompletionType completionType) override { if (const auto* queueAndSerial = GetIfQueueAndSerial()) { - TRACE_EVENT_ASYNC_END0(queueAndSerial->queue->GetDevice()->GetPlatform(), General, - "Buffer::APIMapAsync", - uint64_t(queueAndSerial->completionSerial)); + if (auto queue = queueAndSerial->queue.Promote()) { + TRACE_EVENT_ASYNC_END0(queue->GetDevice()->GetPlatform(), General, + "Buffer::APIMapAsync", + uint64_t(queueAndSerial->completionSerial)); + } } void* userdata1 = mUserdata1.ExtractAsDangling();
diff --git a/src/dawn/native/EventManager.cpp b/src/dawn/native/EventManager.cpp index 2780fe2..62ff4565 100644 --- a/src/dawn/native/EventManager.cpp +++ b/src/dawn/native/EventManager.cpp
@@ -145,14 +145,21 @@ } // Wait/poll queues with given `timeout`. `queueWaitSerials` should contain per queue, the serial up -// to which we should flush the queue if needed. -using QueueWaitSerialsMap = absl::flat_hash_map<QueueBase*, ExecutionSerial>; +// to which we should flush the queue if needed. Note that keys are WeakRef<QueueBase> which +// actually means the keys are not based on the QueueBase pointer, but a pointer to metadata that is +// guaranteed to be unique and alive. This ensures that each queue will be represented for multi +// source validation. +using QueueWaitSerialsMap = absl::flat_hash_map<WeakRef<QueueBase>, ExecutionSerial>; void WaitQueueSerials(const QueueWaitSerialsMap& queueWaitSerials, Nanoseconds timeout) { // TODO(dawn:1662): Make error handling thread-safe. // Poll/wait on queues up to the lowest wait serial, but do this once per queue instead of // per event so that events with same serial complete at the same time instead of racing. for (const auto& queueAndSerial : queueWaitSerials) { - auto* queue = queueAndSerial.first; + auto queue = queueAndSerial.first.Promote(); + if (queue == nullptr) { + // If we can't promote the queue, then all the work is already done. + continue; + } auto waitSerial = queueAndSerial.second; auto* device = queue->GetDevice(); @@ -176,7 +183,7 @@ } return {}; }(), - "waiting for work in %s.", queue); + "waiting for work in %s.", queue.Get()); } } @@ -186,8 +193,8 @@ Nanoseconds timeout) { bool foundSystemEvent = false; bool foundWaitListEvent = false; - QueueWaitSerialsMap queueLowestWaitSerials; + QueueWaitSerialsMap queueLowestWaitSerials; for (const auto& future : futures) { if (future.event->GetIfSystemEvent()) { foundSystemEvent = true; @@ -197,7 +204,7 @@ } if (const auto* queueAndSerial = future.event->GetIfQueueAndSerial()) { auto [it, inserted] = queueLowestWaitSerials.insert( - {queueAndSerial->queue.Get(), queueAndSerial->completionSerial}); + {queueAndSerial->queue, queueAndSerial->completionSerial}); if (!inserted) { it->second = std::min(it->second, queueAndSerial->completionSerial); } @@ -295,7 +302,15 @@ } void EventManager::ShutDown() { - mEvents.Use([&](auto events) { (*events).reset(); }); + mEvents.Use([&](auto events) { + // For all non-spontaneous events, call their callbacks now. + for (auto& [futureID, event] : **events) { + if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) { + event->EnsureComplete(EventCompletionType::Shutdown); + } + } + (*events).reset(); + }); } bool EventManager::IsShutDown() const { @@ -321,8 +336,29 @@ } } + if (const auto* queueAndSerial = event->GetIfQueueAndSerial()) { + if (auto q = queueAndSerial->queue.Promote()) { + q->TrackSerialTask(queueAndSerial->completionSerial, [this, event]() { + // If this is executed, we can be sure that the raw pointer to this EventManager is + // valid because the Queue is alive and: + // Queue -[refs]-> + // Device -[refs]-> + // Adapter -[refs]-> + // Instance -[owns]-> + // EventManager. + SetFutureReady(event.Get()); + }); + } + } + mEvents.Use([&](auto events) { if (!events->has_value()) { + // We are shutting down, so if the event isn't spontaneous, call the callback now. + if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) { + event->EnsureComplete(EventCompletionType::Shutdown); + } + // Otherwise, in native, the event manager is not in charge of tracking the event, so + // just return early now. return; } if (event->mCallbackMode != wgpu::CallbackMode::WaitAnyOnly) { @@ -338,6 +374,8 @@ } void EventManager::SetFutureReady(TrackedEvent* event) { + event->SetReadyToComplete(); + // Sometimes, events might become ready before they are even tracked. This can happen because // tracking is ordered to uphold callback ordering, but events may become ready in any order. If // the event is spontaneous, it will be completed when it is tracked. @@ -345,8 +383,6 @@ return; } - event->SetReadyToComplete(); - // Handle spontaneous completion now. if (event->mCallbackMode == wgpu::CallbackMode::AllowSpontaneous) { mEvents.Use([&](auto events) { @@ -502,6 +538,15 @@ return wgpu::WaitStatus::Success; } +// QueueAndSerial + +ExecutionSerial QueueAndSerial::GetCompletedSerial() const { + if (auto q = queue.Promote()) { + return q->GetCompletedCommandSerial(); + } + return completionSerial; +} + // EventManager::TrackedEvent EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode, @@ -515,7 +560,8 @@ EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode, QueueBase* queue, ExecutionSerial completionSerial) - : mCallbackMode(callbackMode), mCompletionData(QueueAndSerial{queue, completionSerial}) {} + : mCallbackMode(callbackMode), + mCompletionData(QueueAndSerial{GetWeakRef(queue), completionSerial}) {} EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode, Completed tag) : mCallbackMode(callbackMode), mCompletionData(AcquireRef(new WaitListEvent())) { @@ -545,8 +591,7 @@ isReady = event->IsSignaled(); } if (const auto* queueAndSerial = GetIfQueueAndSerial()) { - isReady = - queueAndSerial->completionSerial <= queueAndSerial->queue->GetCompletedCommandSerial(); + isReady = queueAndSerial->completionSerial <= queueAndSerial->GetCompletedSerial(); } return isReady; } @@ -559,7 +604,7 @@ event->Signal(); } if (auto* queueAndSerial = std::get_if<QueueAndSerial>(&mCompletionData)) { - queueAndSerial->completionSerial = queueAndSerial->queue->GetCompletedCommandSerial(); + queueAndSerial->completionSerial = queueAndSerial->GetCompletedSerial(); } }
diff --git a/src/dawn/native/EventManager.h b/src/dawn/native/EventManager.h index a6a4ec0..90ac919 100644 --- a/src/dawn/native/EventManager.h +++ b/src/dawn/native/EventManager.h
@@ -39,9 +39,11 @@ #include "dawn/common/MutexProtected.h" #include "dawn/common/NonMovable.h" #include "dawn/common/Ref.h" +#include "dawn/common/WeakRef.h" #include "dawn/native/Error.h" #include "dawn/native/Forward.h" #include "dawn/native/IntegerTypes.h" +#include "dawn/native/Queue.h" #include "dawn/native/SystemEvent.h" #include "dawn/native/WaitListEvent.h" #include "partition_alloc/pointers/raw_ptr.h" @@ -103,8 +105,11 @@ }; struct QueueAndSerial { - Ref<QueueBase> queue; + WeakRef<QueueBase> queue; ExecutionSerial completionSerial; + + // Returns the most recently completed serial on |queue|. Otherwise, returns |completionSerial|. + ExecutionSerial GetCompletedSerial() const; }; // Base class for the objects that back WGPUFutures. TrackedEvent is responsible for the lifetime
diff --git a/src/dawn/native/ExecutionQueue.cpp b/src/dawn/native/ExecutionQueue.cpp index 75e9822..6fbb56a 100644 --- a/src/dawn/native/ExecutionQueue.cpp +++ b/src/dawn/native/ExecutionQueue.cpp
@@ -28,6 +28,7 @@ #include "dawn/native/ExecutionQueue.h" #include <atomic> +#include <utility> namespace dawn::native { @@ -50,13 +51,31 @@ DAWN_ASSERT(completedSerial <= ExecutionSerial(mLastSubmittedSerial.load(std::memory_order_acquire))); + UpdateCompletedSerial(completedSerial); + return {}; +} + +void ExecutionQueueBase::TrackSerialTask(ExecutionSerial serial, Task&& task) { + if (serial <= GetCompletedCommandSerial()) { + task(); + return; + } + mWaitingTasks->Enqueue(std::move(task), serial); +} + +void ExecutionQueueBase::UpdateCompletedSerial(ExecutionSerial completedSerial) { // Atomically set mCompletedSerial to completedSerial if completedSerial is larger. uint64_t current = mCompletedSerial.load(std::memory_order_acquire); while (uint64_t(completedSerial) > current && !mCompletedSerial.compare_exchange_weak(current, uint64_t(completedSerial), std::memory_order_acq_rel)) { } - return {}; + mWaitingTasks.Use([&](auto tasks) { + for (auto task : tasks->IterateUpTo(completedSerial)) { + task(); + } + tasks->ClearUpTo(completedSerial); + }); } MaybeError ExecutionQueueBase::EnsureCommandsFlushed(ExecutionSerial serial) { @@ -73,8 +92,9 @@ // Bump serials so any pending callbacks can be fired. // TODO(crbug.com/dawn/831): This is called during device destroy, which is not // thread-safe yet. Two threads calling destroy would race setting these serials. - uint64_t prev = mLastSubmittedSerial.fetch_add(1u, std::memory_order_release); - mCompletedSerial.store(prev + 1u, std::memory_order_release); + ExecutionSerial completed = + ExecutionSerial(mLastSubmittedSerial.fetch_add(1u, std::memory_order_release) + 1); + UpdateCompletedSerial(completed); } void ExecutionQueueBase::IncrementLastSubmittedCommandSerial() {
diff --git a/src/dawn/native/ExecutionQueue.h b/src/dawn/native/ExecutionQueue.h index 5bd86ff..c52908c 100644 --- a/src/dawn/native/ExecutionQueue.h +++ b/src/dawn/native/ExecutionQueue.h
@@ -29,10 +29,13 @@ #define SRC_DAWN_NATIVE_EXECUTIONQUEUE_H_ #include <atomic> +#include <functional> +#include "dawn/common/MutexProtected.h" +#include "dawn/common/SerialMap.h" #include "dawn/native/Error.h" -#include "dawn/native/EventManager.h" #include "dawn/native/IntegerTypes.h" +#include "partition_alloc/pointers/raw_ptr.h" namespace dawn::native { @@ -42,6 +45,8 @@ // only partially safe - where observation of the last-submitted and pending serials is atomic. class ExecutionQueueBase { public: + using Task = std::function<void()>; + // The latest serial known to have completed execution on the queue. ExecutionSerial GetCompletedCommandSerial() const; // The serial of the latest batch of work sent for execution. @@ -81,11 +86,21 @@ // if the serial passed. virtual ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) = 0; + // Tracks a new task to complete when |serial| is reached. + void TrackSerialTask(ExecutionSerial serial, Task&& task); + // In the 'Normal' mode, currently recorded commands in the backend submitted in the next Tick. // However in the 'Passive' mode, the submission will be postponed as late as possible, for // example, until the client has explictly issued a submission. enum class SubmitMode { Normal, Passive }; + protected: + // Currently, the queue has two paths for serial updating, one is via DeviceBase::Tick which + // calls into the backend specific polling mechanisms implemented in + // CheckAndUpdateCompletedSerials. Alternatively, the backend can actively call + // UpdateCompletedSerial when a new serial is complete to make forward progress proactively. + void UpdateCompletedSerial(ExecutionSerial completedSerial); + private: // Each backend should implement to check their passed fences if there are any and return a // completed serial. Return 0 should indicate no fences to check. @@ -97,6 +112,8 @@ std::atomic<uint64_t> mCompletedSerial = static_cast<uint64_t>(kBeginningOfGPUTime); std::atomic<uint64_t> mLastSubmittedSerial = static_cast<uint64_t>(kBeginningOfGPUTime); + MutexProtected<SerialMap<ExecutionSerial, Task>> mWaitingTasks; + // Indicates whether the backend has pending commands to be submitted as soon as possible. virtual bool HasPendingCommands() const = 0;
diff --git a/src/dawn/native/Queue.cpp b/src/dawn/native/Queue.cpp index ef30378..393154a 100644 --- a/src/dawn/native/Queue.cpp +++ b/src/dawn/native/Queue.cpp
@@ -196,7 +196,7 @@ wgpu::QueueWorkDoneStatus earlyStatus) : TrackedEvent(static_cast<wgpu::CallbackMode>(callbackInfo.mode), queue, - kBeginningOfGPUTime), + queue->GetCompletedCommandSerial()), mEarlyStatus(ToAPI(earlyStatus)), mCallback(callbackInfo.callback), mUserdata1(callbackInfo.userdata1), @@ -226,8 +226,6 @@ // TODO(crbug.com/dawn/831) Manually acquire device lock instead of relying on code-gen for // re-entrancy. auto deviceLock(GetDevice()->GetScopedLock()); - - // Note: if the callback is spontaneous, it may get called in here. if (GetDevice()->ConsumedError(GetDevice()->ValidateIsAlive())) { event = AcquireRef( new WorkDoneEvent(callbackInfo, this, wgpu::QueueWorkDoneStatus::Success)); @@ -239,6 +237,7 @@ } } + // Note: if the callback is spontaneous, it may get called in here. FutureID futureID = GetInstance()->GetEventManager()->TrackEvent(std::move(event)); return {futureID};
diff --git a/src/dawn/native/Queue.h b/src/dawn/native/Queue.h index 1e166a8..8371e65 100644 --- a/src/dawn/native/Queue.h +++ b/src/dawn/native/Queue.h
@@ -32,6 +32,7 @@ #include "dawn/common/MutexProtected.h" #include "dawn/common/SerialMap.h" +#include "dawn/common/WeakRefSupport.h" #include "dawn/native/CallbackTaskManager.h" #include "dawn/native/Error.h" #include "dawn/native/ExecutionQueue.h" @@ -61,7 +62,9 @@ ExecutionSerial mSerial = kMaxExecutionSerial; }; -class QueueBase : public ApiObjectBase, public ExecutionQueueBase { +class QueueBase : public ApiObjectBase, + public ExecutionQueueBase, + public WeakRefSupport<QueueBase> { public: ~QueueBase() override; @@ -92,6 +95,7 @@ uint64_t bufferOffset, const void* data, size_t size); + // Ensure a flush occurs if needed, and track this task as complete after the // scheduled work is complete. void TrackTaskAfterEventualFlush(std::unique_ptr<TrackTaskCallback> task);
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp index 3d00661..b065371 100644 --- a/src/dawn/tests/end2end/EventTests.cpp +++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -406,6 +406,8 @@ TEST_P(EventCompletionTests, WorkDoneDropInstanceBeforeEvent) { // TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet. DAWN_TEST_UNSUPPORTED_IF(UsesWire()); + // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet. + DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous()); UseSecondInstance(); testInstance = nullptr; // Drop the last external ref to the instance. @@ -414,20 +416,14 @@ testQueue.OnSubmittedWorkDone(GetCallbackMode(), [&status](wgpu::QueueWorkDoneStatus result) { status = result; }); - if (IsSpontaneous()) { - // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect - // the callback to be cleaned up immediately (and should expect it to happen on a future - // Tick). - ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success), - Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled))); - } else { - ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled); - } + ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled); } TEST_P(EventCompletionTests, WorkDoneDropInstanceAfterEvent) { // TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet. DAWN_TEST_UNSUPPORTED_IF(UsesWire()); + // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet. + DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous()); UseSecondInstance(); @@ -435,19 +431,9 @@ testQueue.OnSubmittedWorkDone(GetCallbackMode(), [&status](wgpu::QueueWorkDoneStatus result) { status = result; }); - if (IsSpontaneous()) { - testInstance = nullptr; // Drop the last external ref to the instance. - - // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect - // the callback to be cleaned up immediately (and should expect it to happen on a future - // Tick). - ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success), - Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled))); - } else { - ASSERT_EQ(status, kStatusUninitialized); - testInstance = nullptr; // Drop the last external ref to the instance. - ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled); - } + ASSERT_EQ(status, kStatusUninitialized); + testInstance = nullptr; // Drop the last external ref to the instance. + ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled); } // TODO(crbug.com/dawn/1987):