[dawn][native] Adds task tracking in ExecutionQueueBase.
- Implements the necessary tools in ExecutionQueueBase as described
in the [design doc](https://docs.google.com/document/d/1o_1SvkpnWq0ysOxU5jJLkWKjBrLG5eUHqmWQSjQiu2s/edit?usp=sharing).
- Uses WeakRefs for Queue based events to avoid ref-cycle of
QueueBase -[implements]->
ExecutionQueueBase -[owns]->
Task -[refs]->
TrackedEvent -[refs]->
QueueBase
- Much of the task tracking is based on the CallbackTaskManager
which we can hopefully remove once more refactoring is done.
- Introduces a mechanism for the backend to proactively
update the completed serial while also tracking queue event
futures. This way, when the backend notifies that a particular
serial is passed, we can flush all future callbacks up to that
serial.
- Ensure that once the instance is dropped, all non-spontaneous
event callbacks are called.
- Disables some Spontaneous tests since they won't work anymore
now that we are in the middle of implementing the infra to
enable AllowSpontaneous.
Bug: 412761228
Change-Id: I1650ab2ef1fa60837553d7bd3a5f6e0b4e7ad3b7
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/239115
Reviewed-by: Corentin Wallez <cwallez@chromium.org>
Reviewed-by: Kai Ninomiya <kainino@chromium.org>
Commit-Queue: Loko Kung <lokokung@google.com>
diff --git a/src/dawn/native/Buffer.cpp b/src/dawn/native/Buffer.cpp
index 092ab2e..946e5a8 100644
--- a/src/dawn/native/Buffer.cpp
+++ b/src/dawn/native/Buffer.cpp
@@ -225,9 +225,11 @@
void Complete(EventCompletionType completionType) override {
if (const auto* queueAndSerial = GetIfQueueAndSerial()) {
- TRACE_EVENT_ASYNC_END0(queueAndSerial->queue->GetDevice()->GetPlatform(), General,
- "Buffer::APIMapAsync",
- uint64_t(queueAndSerial->completionSerial));
+ if (auto queue = queueAndSerial->queue.Promote()) {
+ TRACE_EVENT_ASYNC_END0(queue->GetDevice()->GetPlatform(), General,
+ "Buffer::APIMapAsync",
+ uint64_t(queueAndSerial->completionSerial));
+ }
}
void* userdata1 = mUserdata1.ExtractAsDangling();
diff --git a/src/dawn/native/EventManager.cpp b/src/dawn/native/EventManager.cpp
index 2780fe2..62ff4565 100644
--- a/src/dawn/native/EventManager.cpp
+++ b/src/dawn/native/EventManager.cpp
@@ -145,14 +145,21 @@
}
// Wait/poll queues with given `timeout`. `queueWaitSerials` should contain per queue, the serial up
-// to which we should flush the queue if needed.
-using QueueWaitSerialsMap = absl::flat_hash_map<QueueBase*, ExecutionSerial>;
+// to which we should flush the queue if needed. Note that keys are WeakRef<QueueBase> which
+// actually means the keys are not based on the QueueBase pointer, but a pointer to metadata that is
+// guaranteed to be unique and alive. This ensures that each queue will be represented for multi
+// source validation.
+using QueueWaitSerialsMap = absl::flat_hash_map<WeakRef<QueueBase>, ExecutionSerial>;
void WaitQueueSerials(const QueueWaitSerialsMap& queueWaitSerials, Nanoseconds timeout) {
// TODO(dawn:1662): Make error handling thread-safe.
// Poll/wait on queues up to the lowest wait serial, but do this once per queue instead of
// per event so that events with same serial complete at the same time instead of racing.
for (const auto& queueAndSerial : queueWaitSerials) {
- auto* queue = queueAndSerial.first;
+ auto queue = queueAndSerial.first.Promote();
+ if (queue == nullptr) {
+ // If we can't promote the queue, then all the work is already done.
+ continue;
+ }
auto waitSerial = queueAndSerial.second;
auto* device = queue->GetDevice();
@@ -176,7 +183,7 @@
}
return {};
}(),
- "waiting for work in %s.", queue);
+ "waiting for work in %s.", queue.Get());
}
}
@@ -186,8 +193,8 @@
Nanoseconds timeout) {
bool foundSystemEvent = false;
bool foundWaitListEvent = false;
- QueueWaitSerialsMap queueLowestWaitSerials;
+ QueueWaitSerialsMap queueLowestWaitSerials;
for (const auto& future : futures) {
if (future.event->GetIfSystemEvent()) {
foundSystemEvent = true;
@@ -197,7 +204,7 @@
}
if (const auto* queueAndSerial = future.event->GetIfQueueAndSerial()) {
auto [it, inserted] = queueLowestWaitSerials.insert(
- {queueAndSerial->queue.Get(), queueAndSerial->completionSerial});
+ {queueAndSerial->queue, queueAndSerial->completionSerial});
if (!inserted) {
it->second = std::min(it->second, queueAndSerial->completionSerial);
}
@@ -295,7 +302,15 @@
}
void EventManager::ShutDown() {
- mEvents.Use([&](auto events) { (*events).reset(); });
+ mEvents.Use([&](auto events) {
+ // For all non-spontaneous events, call their callbacks now.
+ for (auto& [futureID, event] : **events) {
+ if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) {
+ event->EnsureComplete(EventCompletionType::Shutdown);
+ }
+ }
+ (*events).reset();
+ });
}
bool EventManager::IsShutDown() const {
@@ -321,8 +336,29 @@
}
}
+ if (const auto* queueAndSerial = event->GetIfQueueAndSerial()) {
+ if (auto q = queueAndSerial->queue.Promote()) {
+ q->TrackSerialTask(queueAndSerial->completionSerial, [this, event]() {
+ // If this is executed, we can be sure that the raw pointer to this EventManager is
+ // valid because the Queue is alive and:
+ // Queue -[refs]->
+ // Device -[refs]->
+ // Adapter -[refs]->
+ // Instance -[owns]->
+ // EventManager.
+ SetFutureReady(event.Get());
+ });
+ }
+ }
+
mEvents.Use([&](auto events) {
if (!events->has_value()) {
+ // We are shutting down, so if the event isn't spontaneous, call the callback now.
+ if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) {
+ event->EnsureComplete(EventCompletionType::Shutdown);
+ }
+ // Otherwise, in native, the event manager is not in charge of tracking the event, so
+ // just return early now.
return;
}
if (event->mCallbackMode != wgpu::CallbackMode::WaitAnyOnly) {
@@ -338,6 +374,8 @@
}
void EventManager::SetFutureReady(TrackedEvent* event) {
+ event->SetReadyToComplete();
+
// Sometimes, events might become ready before they are even tracked. This can happen because
// tracking is ordered to uphold callback ordering, but events may become ready in any order. If
// the event is spontaneous, it will be completed when it is tracked.
@@ -345,8 +383,6 @@
return;
}
- event->SetReadyToComplete();
-
// Handle spontaneous completion now.
if (event->mCallbackMode == wgpu::CallbackMode::AllowSpontaneous) {
mEvents.Use([&](auto events) {
@@ -502,6 +538,15 @@
return wgpu::WaitStatus::Success;
}
+// QueueAndSerial
+
+ExecutionSerial QueueAndSerial::GetCompletedSerial() const {
+ if (auto q = queue.Promote()) {
+ return q->GetCompletedCommandSerial();
+ }
+ return completionSerial;
+}
+
// EventManager::TrackedEvent
EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode,
@@ -515,7 +560,8 @@
EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode,
QueueBase* queue,
ExecutionSerial completionSerial)
- : mCallbackMode(callbackMode), mCompletionData(QueueAndSerial{queue, completionSerial}) {}
+ : mCallbackMode(callbackMode),
+ mCompletionData(QueueAndSerial{GetWeakRef(queue), completionSerial}) {}
EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode, Completed tag)
: mCallbackMode(callbackMode), mCompletionData(AcquireRef(new WaitListEvent())) {
@@ -545,8 +591,7 @@
isReady = event->IsSignaled();
}
if (const auto* queueAndSerial = GetIfQueueAndSerial()) {
- isReady =
- queueAndSerial->completionSerial <= queueAndSerial->queue->GetCompletedCommandSerial();
+ isReady = queueAndSerial->completionSerial <= queueAndSerial->GetCompletedSerial();
}
return isReady;
}
@@ -559,7 +604,7 @@
event->Signal();
}
if (auto* queueAndSerial = std::get_if<QueueAndSerial>(&mCompletionData)) {
- queueAndSerial->completionSerial = queueAndSerial->queue->GetCompletedCommandSerial();
+ queueAndSerial->completionSerial = queueAndSerial->GetCompletedSerial();
}
}
diff --git a/src/dawn/native/EventManager.h b/src/dawn/native/EventManager.h
index a6a4ec0..90ac919 100644
--- a/src/dawn/native/EventManager.h
+++ b/src/dawn/native/EventManager.h
@@ -39,9 +39,11 @@
#include "dawn/common/MutexProtected.h"
#include "dawn/common/NonMovable.h"
#include "dawn/common/Ref.h"
+#include "dawn/common/WeakRef.h"
#include "dawn/native/Error.h"
#include "dawn/native/Forward.h"
#include "dawn/native/IntegerTypes.h"
+#include "dawn/native/Queue.h"
#include "dawn/native/SystemEvent.h"
#include "dawn/native/WaitListEvent.h"
#include "partition_alloc/pointers/raw_ptr.h"
@@ -103,8 +105,11 @@
};
struct QueueAndSerial {
- Ref<QueueBase> queue;
+ WeakRef<QueueBase> queue;
ExecutionSerial completionSerial;
+
+ // Returns the most recently completed serial on |queue|. Otherwise, returns |completionSerial|.
+ ExecutionSerial GetCompletedSerial() const;
};
// Base class for the objects that back WGPUFutures. TrackedEvent is responsible for the lifetime
diff --git a/src/dawn/native/ExecutionQueue.cpp b/src/dawn/native/ExecutionQueue.cpp
index 75e9822..6fbb56a 100644
--- a/src/dawn/native/ExecutionQueue.cpp
+++ b/src/dawn/native/ExecutionQueue.cpp
@@ -28,6 +28,7 @@
#include "dawn/native/ExecutionQueue.h"
#include <atomic>
+#include <utility>
namespace dawn::native {
@@ -50,13 +51,31 @@
DAWN_ASSERT(completedSerial <=
ExecutionSerial(mLastSubmittedSerial.load(std::memory_order_acquire)));
+ UpdateCompletedSerial(completedSerial);
+ return {};
+}
+
+void ExecutionQueueBase::TrackSerialTask(ExecutionSerial serial, Task&& task) {
+ if (serial <= GetCompletedCommandSerial()) {
+ task();
+ return;
+ }
+ mWaitingTasks->Enqueue(std::move(task), serial);
+}
+
+void ExecutionQueueBase::UpdateCompletedSerial(ExecutionSerial completedSerial) {
// Atomically set mCompletedSerial to completedSerial if completedSerial is larger.
uint64_t current = mCompletedSerial.load(std::memory_order_acquire);
while (uint64_t(completedSerial) > current &&
!mCompletedSerial.compare_exchange_weak(current, uint64_t(completedSerial),
std::memory_order_acq_rel)) {
}
- return {};
+ mWaitingTasks.Use([&](auto tasks) {
+ for (auto task : tasks->IterateUpTo(completedSerial)) {
+ task();
+ }
+ tasks->ClearUpTo(completedSerial);
+ });
}
MaybeError ExecutionQueueBase::EnsureCommandsFlushed(ExecutionSerial serial) {
@@ -73,8 +92,9 @@
// Bump serials so any pending callbacks can be fired.
// TODO(crbug.com/dawn/831): This is called during device destroy, which is not
// thread-safe yet. Two threads calling destroy would race setting these serials.
- uint64_t prev = mLastSubmittedSerial.fetch_add(1u, std::memory_order_release);
- mCompletedSerial.store(prev + 1u, std::memory_order_release);
+ ExecutionSerial completed =
+ ExecutionSerial(mLastSubmittedSerial.fetch_add(1u, std::memory_order_release) + 1);
+ UpdateCompletedSerial(completed);
}
void ExecutionQueueBase::IncrementLastSubmittedCommandSerial() {
diff --git a/src/dawn/native/ExecutionQueue.h b/src/dawn/native/ExecutionQueue.h
index 5bd86ff..c52908c 100644
--- a/src/dawn/native/ExecutionQueue.h
+++ b/src/dawn/native/ExecutionQueue.h
@@ -29,10 +29,13 @@
#define SRC_DAWN_NATIVE_EXECUTIONQUEUE_H_
#include <atomic>
+#include <functional>
+#include "dawn/common/MutexProtected.h"
+#include "dawn/common/SerialMap.h"
#include "dawn/native/Error.h"
-#include "dawn/native/EventManager.h"
#include "dawn/native/IntegerTypes.h"
+#include "partition_alloc/pointers/raw_ptr.h"
namespace dawn::native {
@@ -42,6 +45,8 @@
// only partially safe - where observation of the last-submitted and pending serials is atomic.
class ExecutionQueueBase {
public:
+ using Task = std::function<void()>;
+
// The latest serial known to have completed execution on the queue.
ExecutionSerial GetCompletedCommandSerial() const;
// The serial of the latest batch of work sent for execution.
@@ -81,11 +86,21 @@
// if the serial passed.
virtual ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) = 0;
+ // Tracks a new task to complete when |serial| is reached.
+ void TrackSerialTask(ExecutionSerial serial, Task&& task);
+
// In the 'Normal' mode, currently recorded commands in the backend submitted in the next Tick.
// However in the 'Passive' mode, the submission will be postponed as late as possible, for
// example, until the client has explictly issued a submission.
enum class SubmitMode { Normal, Passive };
+ protected:
+ // Currently, the queue has two paths for serial updating, one is via DeviceBase::Tick which
+ // calls into the backend specific polling mechanisms implemented in
+ // CheckAndUpdateCompletedSerials. Alternatively, the backend can actively call
+ // UpdateCompletedSerial when a new serial is complete to make forward progress proactively.
+ void UpdateCompletedSerial(ExecutionSerial completedSerial);
+
private:
// Each backend should implement to check their passed fences if there are any and return a
// completed serial. Return 0 should indicate no fences to check.
@@ -97,6 +112,8 @@
std::atomic<uint64_t> mCompletedSerial = static_cast<uint64_t>(kBeginningOfGPUTime);
std::atomic<uint64_t> mLastSubmittedSerial = static_cast<uint64_t>(kBeginningOfGPUTime);
+ MutexProtected<SerialMap<ExecutionSerial, Task>> mWaitingTasks;
+
// Indicates whether the backend has pending commands to be submitted as soon as possible.
virtual bool HasPendingCommands() const = 0;
diff --git a/src/dawn/native/Queue.cpp b/src/dawn/native/Queue.cpp
index ef30378..393154a 100644
--- a/src/dawn/native/Queue.cpp
+++ b/src/dawn/native/Queue.cpp
@@ -196,7 +196,7 @@
wgpu::QueueWorkDoneStatus earlyStatus)
: TrackedEvent(static_cast<wgpu::CallbackMode>(callbackInfo.mode),
queue,
- kBeginningOfGPUTime),
+ queue->GetCompletedCommandSerial()),
mEarlyStatus(ToAPI(earlyStatus)),
mCallback(callbackInfo.callback),
mUserdata1(callbackInfo.userdata1),
@@ -226,8 +226,6 @@
// TODO(crbug.com/dawn/831) Manually acquire device lock instead of relying on code-gen for
// re-entrancy.
auto deviceLock(GetDevice()->GetScopedLock());
-
- // Note: if the callback is spontaneous, it may get called in here.
if (GetDevice()->ConsumedError(GetDevice()->ValidateIsAlive())) {
event = AcquireRef(
new WorkDoneEvent(callbackInfo, this, wgpu::QueueWorkDoneStatus::Success));
@@ -239,6 +237,7 @@
}
}
+ // Note: if the callback is spontaneous, it may get called in here.
FutureID futureID = GetInstance()->GetEventManager()->TrackEvent(std::move(event));
return {futureID};
diff --git a/src/dawn/native/Queue.h b/src/dawn/native/Queue.h
index 1e166a8..8371e65 100644
--- a/src/dawn/native/Queue.h
+++ b/src/dawn/native/Queue.h
@@ -32,6 +32,7 @@
#include "dawn/common/MutexProtected.h"
#include "dawn/common/SerialMap.h"
+#include "dawn/common/WeakRefSupport.h"
#include "dawn/native/CallbackTaskManager.h"
#include "dawn/native/Error.h"
#include "dawn/native/ExecutionQueue.h"
@@ -61,7 +62,9 @@
ExecutionSerial mSerial = kMaxExecutionSerial;
};
-class QueueBase : public ApiObjectBase, public ExecutionQueueBase {
+class QueueBase : public ApiObjectBase,
+ public ExecutionQueueBase,
+ public WeakRefSupport<QueueBase> {
public:
~QueueBase() override;
@@ -92,6 +95,7 @@
uint64_t bufferOffset,
const void* data,
size_t size);
+
// Ensure a flush occurs if needed, and track this task as complete after the
// scheduled work is complete.
void TrackTaskAfterEventualFlush(std::unique_ptr<TrackTaskCallback> task);
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp
index 3d00661..b065371 100644
--- a/src/dawn/tests/end2end/EventTests.cpp
+++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -406,6 +406,8 @@
TEST_P(EventCompletionTests, WorkDoneDropInstanceBeforeEvent) {
// TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet.
DAWN_TEST_UNSUPPORTED_IF(UsesWire());
+ // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet.
+ DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous());
UseSecondInstance();
testInstance = nullptr; // Drop the last external ref to the instance.
@@ -414,20 +416,14 @@
testQueue.OnSubmittedWorkDone(GetCallbackMode(),
[&status](wgpu::QueueWorkDoneStatus result) { status = result; });
- if (IsSpontaneous()) {
- // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect
- // the callback to be cleaned up immediately (and should expect it to happen on a future
- // Tick).
- ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success),
- Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled)));
- } else {
- ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
- }
+ ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
}
TEST_P(EventCompletionTests, WorkDoneDropInstanceAfterEvent) {
// TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet.
DAWN_TEST_UNSUPPORTED_IF(UsesWire());
+ // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet.
+ DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous());
UseSecondInstance();
@@ -435,19 +431,9 @@
testQueue.OnSubmittedWorkDone(GetCallbackMode(),
[&status](wgpu::QueueWorkDoneStatus result) { status = result; });
- if (IsSpontaneous()) {
- testInstance = nullptr; // Drop the last external ref to the instance.
-
- // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect
- // the callback to be cleaned up immediately (and should expect it to happen on a future
- // Tick).
- ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success),
- Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled)));
- } else {
- ASSERT_EQ(status, kStatusUninitialized);
- testInstance = nullptr; // Drop the last external ref to the instance.
- ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
- }
+ ASSERT_EQ(status, kStatusUninitialized);
+ testInstance = nullptr; // Drop the last external ref to the instance.
+ ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
}
// TODO(crbug.com/dawn/1987):