[dawn][native] Adds task tracking in ExecutionQueueBase.

- Implements the necessary tools in ExecutionQueueBase as described
  in the [design doc](https://docs.google.com/document/d/1o_1SvkpnWq0ysOxU5jJLkWKjBrLG5eUHqmWQSjQiu2s/edit?usp=sharing).
- Uses WeakRefs for Queue based events to avoid ref-cycle of
  QueueBase            -[implements]->
    ExecutionQueueBase -[owns]->
      Task             -[refs]->
        TrackedEvent   -[refs]->
	  QueueBase
- Much of the task tracking is based on the CallbackTaskManager
  which we can hopefully remove once more refactoring is done.
- Introduces a mechanism for the backend to proactively
  update the completed serial while also tracking queue event
  futures. This way, when the backend notifies that a particular
  serial is passed, we can flush all future callbacks up to that
  serial.
- Ensure that once the instance is dropped, all non-spontaneous
  event callbacks are called.
- Disables some Spontaneous tests since they won't work anymore
  now that we are in the middle of implementing the infra to
  enable AllowSpontaneous.

Bug: 412761228
Change-Id: I1650ab2ef1fa60837553d7bd3a5f6e0b4e7ad3b7
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/239115
Reviewed-by: Corentin Wallez <cwallez@chromium.org>
Reviewed-by: Kai Ninomiya <kainino@chromium.org>
Commit-Queue: Loko Kung <lokokung@google.com>
diff --git a/src/dawn/native/Buffer.cpp b/src/dawn/native/Buffer.cpp
index 092ab2e..946e5a8 100644
--- a/src/dawn/native/Buffer.cpp
+++ b/src/dawn/native/Buffer.cpp
@@ -225,9 +225,11 @@
 
     void Complete(EventCompletionType completionType) override {
         if (const auto* queueAndSerial = GetIfQueueAndSerial()) {
-            TRACE_EVENT_ASYNC_END0(queueAndSerial->queue->GetDevice()->GetPlatform(), General,
-                                   "Buffer::APIMapAsync",
-                                   uint64_t(queueAndSerial->completionSerial));
+            if (auto queue = queueAndSerial->queue.Promote()) {
+                TRACE_EVENT_ASYNC_END0(queue->GetDevice()->GetPlatform(), General,
+                                       "Buffer::APIMapAsync",
+                                       uint64_t(queueAndSerial->completionSerial));
+            }
         }
 
         void* userdata1 = mUserdata1.ExtractAsDangling();
diff --git a/src/dawn/native/EventManager.cpp b/src/dawn/native/EventManager.cpp
index 2780fe2..62ff4565 100644
--- a/src/dawn/native/EventManager.cpp
+++ b/src/dawn/native/EventManager.cpp
@@ -145,14 +145,21 @@
 }
 
 // Wait/poll queues with given `timeout`. `queueWaitSerials` should contain per queue, the serial up
-// to which we should flush the queue if needed.
-using QueueWaitSerialsMap = absl::flat_hash_map<QueueBase*, ExecutionSerial>;
+// to which we should flush the queue if needed. Note that keys are WeakRef<QueueBase> which
+// actually means the keys are not based on the QueueBase pointer, but a pointer to metadata that is
+// guaranteed to be unique and alive. This ensures that each queue will be represented for multi
+// source validation.
+using QueueWaitSerialsMap = absl::flat_hash_map<WeakRef<QueueBase>, ExecutionSerial>;
 void WaitQueueSerials(const QueueWaitSerialsMap& queueWaitSerials, Nanoseconds timeout) {
     // TODO(dawn:1662): Make error handling thread-safe.
     // Poll/wait on queues up to the lowest wait serial, but do this once per queue instead of
     // per event so that events with same serial complete at the same time instead of racing.
     for (const auto& queueAndSerial : queueWaitSerials) {
-        auto* queue = queueAndSerial.first;
+        auto queue = queueAndSerial.first.Promote();
+        if (queue == nullptr) {
+            // If we can't promote the queue, then all the work is already done.
+            continue;
+        }
         auto waitSerial = queueAndSerial.second;
 
         auto* device = queue->GetDevice();
@@ -176,7 +183,7 @@
                 }
                 return {};
             }(),
-            "waiting for work in %s.", queue);
+            "waiting for work in %s.", queue.Get());
     }
 }
 
@@ -186,8 +193,8 @@
                           Nanoseconds timeout) {
     bool foundSystemEvent = false;
     bool foundWaitListEvent = false;
-    QueueWaitSerialsMap queueLowestWaitSerials;
 
+    QueueWaitSerialsMap queueLowestWaitSerials;
     for (const auto& future : futures) {
         if (future.event->GetIfSystemEvent()) {
             foundSystemEvent = true;
@@ -197,7 +204,7 @@
         }
         if (const auto* queueAndSerial = future.event->GetIfQueueAndSerial()) {
             auto [it, inserted] = queueLowestWaitSerials.insert(
-                {queueAndSerial->queue.Get(), queueAndSerial->completionSerial});
+                {queueAndSerial->queue, queueAndSerial->completionSerial});
             if (!inserted) {
                 it->second = std::min(it->second, queueAndSerial->completionSerial);
             }
@@ -295,7 +302,15 @@
 }
 
 void EventManager::ShutDown() {
-    mEvents.Use([&](auto events) { (*events).reset(); });
+    mEvents.Use([&](auto events) {
+        // For all non-spontaneous events, call their callbacks now.
+        for (auto& [futureID, event] : **events) {
+            if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) {
+                event->EnsureComplete(EventCompletionType::Shutdown);
+            }
+        }
+        (*events).reset();
+    });
 }
 
 bool EventManager::IsShutDown() const {
@@ -321,8 +336,29 @@
         }
     }
 
+    if (const auto* queueAndSerial = event->GetIfQueueAndSerial()) {
+        if (auto q = queueAndSerial->queue.Promote()) {
+            q->TrackSerialTask(queueAndSerial->completionSerial, [this, event]() {
+                // If this is executed, we can be sure that the raw pointer to this EventManager is
+                // valid because the Queue is alive and:
+                //   Queue -[refs]->
+                //     Device -[refs]->
+                //       Adapter -[refs]->
+                //         Instance -[owns]->
+                //           EventManager.
+                SetFutureReady(event.Get());
+            });
+        }
+    }
+
     mEvents.Use([&](auto events) {
         if (!events->has_value()) {
+            // We are shutting down, so if the event isn't spontaneous, call the callback now.
+            if (event->mCallbackMode != wgpu::CallbackMode::AllowSpontaneous) {
+                event->EnsureComplete(EventCompletionType::Shutdown);
+            }
+            // Otherwise, in native, the event manager is not in charge of tracking the event, so
+            // just return early now.
             return;
         }
         if (event->mCallbackMode != wgpu::CallbackMode::WaitAnyOnly) {
@@ -338,6 +374,8 @@
 }
 
 void EventManager::SetFutureReady(TrackedEvent* event) {
+    event->SetReadyToComplete();
+
     // Sometimes, events might become ready before they are even tracked. This can happen because
     // tracking is ordered to uphold callback ordering, but events may become ready in any order. If
     // the event is spontaneous, it will be completed when it is tracked.
@@ -345,8 +383,6 @@
         return;
     }
 
-    event->SetReadyToComplete();
-
     // Handle spontaneous completion now.
     if (event->mCallbackMode == wgpu::CallbackMode::AllowSpontaneous) {
         mEvents.Use([&](auto events) {
@@ -502,6 +538,15 @@
     return wgpu::WaitStatus::Success;
 }
 
+// QueueAndSerial
+
+ExecutionSerial QueueAndSerial::GetCompletedSerial() const {
+    if (auto q = queue.Promote()) {
+        return q->GetCompletedCommandSerial();
+    }
+    return completionSerial;
+}
+
 // EventManager::TrackedEvent
 
 EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode,
@@ -515,7 +560,8 @@
 EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode,
                                          QueueBase* queue,
                                          ExecutionSerial completionSerial)
-    : mCallbackMode(callbackMode), mCompletionData(QueueAndSerial{queue, completionSerial}) {}
+    : mCallbackMode(callbackMode),
+      mCompletionData(QueueAndSerial{GetWeakRef(queue), completionSerial}) {}
 
 EventManager::TrackedEvent::TrackedEvent(wgpu::CallbackMode callbackMode, Completed tag)
     : mCallbackMode(callbackMode), mCompletionData(AcquireRef(new WaitListEvent())) {
@@ -545,8 +591,7 @@
         isReady = event->IsSignaled();
     }
     if (const auto* queueAndSerial = GetIfQueueAndSerial()) {
-        isReady =
-            queueAndSerial->completionSerial <= queueAndSerial->queue->GetCompletedCommandSerial();
+        isReady = queueAndSerial->completionSerial <= queueAndSerial->GetCompletedSerial();
     }
     return isReady;
 }
@@ -559,7 +604,7 @@
         event->Signal();
     }
     if (auto* queueAndSerial = std::get_if<QueueAndSerial>(&mCompletionData)) {
-        queueAndSerial->completionSerial = queueAndSerial->queue->GetCompletedCommandSerial();
+        queueAndSerial->completionSerial = queueAndSerial->GetCompletedSerial();
     }
 }
 
diff --git a/src/dawn/native/EventManager.h b/src/dawn/native/EventManager.h
index a6a4ec0..90ac919 100644
--- a/src/dawn/native/EventManager.h
+++ b/src/dawn/native/EventManager.h
@@ -39,9 +39,11 @@
 #include "dawn/common/MutexProtected.h"
 #include "dawn/common/NonMovable.h"
 #include "dawn/common/Ref.h"
+#include "dawn/common/WeakRef.h"
 #include "dawn/native/Error.h"
 #include "dawn/native/Forward.h"
 #include "dawn/native/IntegerTypes.h"
+#include "dawn/native/Queue.h"
 #include "dawn/native/SystemEvent.h"
 #include "dawn/native/WaitListEvent.h"
 #include "partition_alloc/pointers/raw_ptr.h"
@@ -103,8 +105,11 @@
 };
 
 struct QueueAndSerial {
-    Ref<QueueBase> queue;
+    WeakRef<QueueBase> queue;
     ExecutionSerial completionSerial;
+
+    // Returns the most recently completed serial on |queue|. Otherwise, returns |completionSerial|.
+    ExecutionSerial GetCompletedSerial() const;
 };
 
 // Base class for the objects that back WGPUFutures. TrackedEvent is responsible for the lifetime
diff --git a/src/dawn/native/ExecutionQueue.cpp b/src/dawn/native/ExecutionQueue.cpp
index 75e9822..6fbb56a 100644
--- a/src/dawn/native/ExecutionQueue.cpp
+++ b/src/dawn/native/ExecutionQueue.cpp
@@ -28,6 +28,7 @@
 #include "dawn/native/ExecutionQueue.h"
 
 #include <atomic>
+#include <utility>
 
 namespace dawn::native {
 
@@ -50,13 +51,31 @@
     DAWN_ASSERT(completedSerial <=
                 ExecutionSerial(mLastSubmittedSerial.load(std::memory_order_acquire)));
 
+    UpdateCompletedSerial(completedSerial);
+    return {};
+}
+
+void ExecutionQueueBase::TrackSerialTask(ExecutionSerial serial, Task&& task) {
+    if (serial <= GetCompletedCommandSerial()) {
+        task();
+        return;
+    }
+    mWaitingTasks->Enqueue(std::move(task), serial);
+}
+
+void ExecutionQueueBase::UpdateCompletedSerial(ExecutionSerial completedSerial) {
     // Atomically set mCompletedSerial to completedSerial if completedSerial is larger.
     uint64_t current = mCompletedSerial.load(std::memory_order_acquire);
     while (uint64_t(completedSerial) > current &&
            !mCompletedSerial.compare_exchange_weak(current, uint64_t(completedSerial),
                                                    std::memory_order_acq_rel)) {
     }
-    return {};
+    mWaitingTasks.Use([&](auto tasks) {
+        for (auto task : tasks->IterateUpTo(completedSerial)) {
+            task();
+        }
+        tasks->ClearUpTo(completedSerial);
+    });
 }
 
 MaybeError ExecutionQueueBase::EnsureCommandsFlushed(ExecutionSerial serial) {
@@ -73,8 +92,9 @@
     // Bump serials so any pending callbacks can be fired.
     // TODO(crbug.com/dawn/831): This is called during device destroy, which is not
     // thread-safe yet. Two threads calling destroy would race setting these serials.
-    uint64_t prev = mLastSubmittedSerial.fetch_add(1u, std::memory_order_release);
-    mCompletedSerial.store(prev + 1u, std::memory_order_release);
+    ExecutionSerial completed =
+        ExecutionSerial(mLastSubmittedSerial.fetch_add(1u, std::memory_order_release) + 1);
+    UpdateCompletedSerial(completed);
 }
 
 void ExecutionQueueBase::IncrementLastSubmittedCommandSerial() {
diff --git a/src/dawn/native/ExecutionQueue.h b/src/dawn/native/ExecutionQueue.h
index 5bd86ff..c52908c 100644
--- a/src/dawn/native/ExecutionQueue.h
+++ b/src/dawn/native/ExecutionQueue.h
@@ -29,10 +29,13 @@
 #define SRC_DAWN_NATIVE_EXECUTIONQUEUE_H_
 
 #include <atomic>
+#include <functional>
 
+#include "dawn/common/MutexProtected.h"
+#include "dawn/common/SerialMap.h"
 #include "dawn/native/Error.h"
-#include "dawn/native/EventManager.h"
 #include "dawn/native/IntegerTypes.h"
+#include "partition_alloc/pointers/raw_ptr.h"
 
 namespace dawn::native {
 
@@ -42,6 +45,8 @@
 // only partially safe - where observation of the last-submitted and pending serials is atomic.
 class ExecutionQueueBase {
   public:
+    using Task = std::function<void()>;
+
     // The latest serial known to have completed execution on the queue.
     ExecutionSerial GetCompletedCommandSerial() const;
     // The serial of the latest batch of work sent for execution.
@@ -81,11 +86,21 @@
     // if the serial passed.
     virtual ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) = 0;
 
+    // Tracks a new task to complete when |serial| is reached.
+    void TrackSerialTask(ExecutionSerial serial, Task&& task);
+
     // In the 'Normal' mode, currently recorded commands in the backend submitted in the next Tick.
     // However in the 'Passive' mode, the submission will be postponed as late as possible, for
     // example, until the client has explictly issued a submission.
     enum class SubmitMode { Normal, Passive };
 
+  protected:
+    // Currently, the queue has two paths for serial updating, one is via DeviceBase::Tick which
+    // calls into the backend specific polling mechanisms implemented in
+    // CheckAndUpdateCompletedSerials. Alternatively, the backend can actively call
+    // UpdateCompletedSerial when a new serial is complete to make forward progress proactively.
+    void UpdateCompletedSerial(ExecutionSerial completedSerial);
+
   private:
     // Each backend should implement to check their passed fences if there are any and return a
     // completed serial. Return 0 should indicate no fences to check.
@@ -97,6 +112,8 @@
     std::atomic<uint64_t> mCompletedSerial = static_cast<uint64_t>(kBeginningOfGPUTime);
     std::atomic<uint64_t> mLastSubmittedSerial = static_cast<uint64_t>(kBeginningOfGPUTime);
 
+    MutexProtected<SerialMap<ExecutionSerial, Task>> mWaitingTasks;
+
     // Indicates whether the backend has pending commands to be submitted as soon as possible.
     virtual bool HasPendingCommands() const = 0;
 
diff --git a/src/dawn/native/Queue.cpp b/src/dawn/native/Queue.cpp
index ef30378..393154a 100644
--- a/src/dawn/native/Queue.cpp
+++ b/src/dawn/native/Queue.cpp
@@ -196,7 +196,7 @@
                       wgpu::QueueWorkDoneStatus earlyStatus)
             : TrackedEvent(static_cast<wgpu::CallbackMode>(callbackInfo.mode),
                            queue,
-                           kBeginningOfGPUTime),
+                           queue->GetCompletedCommandSerial()),
               mEarlyStatus(ToAPI(earlyStatus)),
               mCallback(callbackInfo.callback),
               mUserdata1(callbackInfo.userdata1),
@@ -226,8 +226,6 @@
         // TODO(crbug.com/dawn/831) Manually acquire device lock instead of relying on code-gen for
         // re-entrancy.
         auto deviceLock(GetDevice()->GetScopedLock());
-
-        // Note: if the callback is spontaneous, it may get called in here.
         if (GetDevice()->ConsumedError(GetDevice()->ValidateIsAlive())) {
             event = AcquireRef(
                 new WorkDoneEvent(callbackInfo, this, wgpu::QueueWorkDoneStatus::Success));
@@ -239,6 +237,7 @@
         }
     }
 
+    // Note: if the callback is spontaneous, it may get called in here.
     FutureID futureID = GetInstance()->GetEventManager()->TrackEvent(std::move(event));
 
     return {futureID};
diff --git a/src/dawn/native/Queue.h b/src/dawn/native/Queue.h
index 1e166a8..8371e65 100644
--- a/src/dawn/native/Queue.h
+++ b/src/dawn/native/Queue.h
@@ -32,6 +32,7 @@
 
 #include "dawn/common/MutexProtected.h"
 #include "dawn/common/SerialMap.h"
+#include "dawn/common/WeakRefSupport.h"
 #include "dawn/native/CallbackTaskManager.h"
 #include "dawn/native/Error.h"
 #include "dawn/native/ExecutionQueue.h"
@@ -61,7 +62,9 @@
     ExecutionSerial mSerial = kMaxExecutionSerial;
 };
 
-class QueueBase : public ApiObjectBase, public ExecutionQueueBase {
+class QueueBase : public ApiObjectBase,
+                  public ExecutionQueueBase,
+                  public WeakRefSupport<QueueBase> {
   public:
     ~QueueBase() override;
 
@@ -92,6 +95,7 @@
                            uint64_t bufferOffset,
                            const void* data,
                            size_t size);
+
     // Ensure a flush occurs if needed, and track this task as complete after the
     // scheduled work is complete.
     void TrackTaskAfterEventualFlush(std::unique_ptr<TrackTaskCallback> task);
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp
index 3d00661..b065371 100644
--- a/src/dawn/tests/end2end/EventTests.cpp
+++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -406,6 +406,8 @@
 TEST_P(EventCompletionTests, WorkDoneDropInstanceBeforeEvent) {
     // TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet.
     DAWN_TEST_UNSUPPORTED_IF(UsesWire());
+    // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet.
+    DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous());
 
     UseSecondInstance();
     testInstance = nullptr;  // Drop the last external ref to the instance.
@@ -414,20 +416,14 @@
     testQueue.OnSubmittedWorkDone(GetCallbackMode(),
                                   [&status](wgpu::QueueWorkDoneStatus result) { status = result; });
 
-    if (IsSpontaneous()) {
-        // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect
-        // the callback to be cleaned up immediately (and should expect it to happen on a future
-        // Tick).
-        ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success),
-                                  Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled)));
-    } else {
-        ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
-    }
+    ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
 }
 
 TEST_P(EventCompletionTests, WorkDoneDropInstanceAfterEvent) {
     // TODO(crbug.com/dawn/1987): Wire does not implement instance destruction correctly yet.
     DAWN_TEST_UNSUPPORTED_IF(UsesWire());
+    // TODO(crbug.com/412761228): Spontaneous events are not implemented correctly yet.
+    DAWN_TEST_UNSUPPORTED_IF(IsSpontaneous());
 
     UseSecondInstance();
 
@@ -435,19 +431,9 @@
     testQueue.OnSubmittedWorkDone(GetCallbackMode(),
                                   [&status](wgpu::QueueWorkDoneStatus result) { status = result; });
 
-    if (IsSpontaneous()) {
-        testInstance = nullptr;  // Drop the last external ref to the instance.
-
-        // TODO(crbug.com/dawn/2059): Once Spontaneous is implemented, this should no longer expect
-        // the callback to be cleaned up immediately (and should expect it to happen on a future
-        // Tick).
-        ASSERT_THAT(status, AnyOf(Eq(wgpu::QueueWorkDoneStatus::Success),
-                                  Eq(wgpu::QueueWorkDoneStatus::CallbackCancelled)));
-    } else {
-        ASSERT_EQ(status, kStatusUninitialized);
-        testInstance = nullptr;  // Drop the last external ref to the instance.
-        ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
-    }
+    ASSERT_EQ(status, kStatusUninitialized);
+    testInstance = nullptr;  // Drop the last external ref to the instance.
+    ASSERT_EQ(status, wgpu::QueueWorkDoneStatus::CallbackCancelled);
 }
 
 // TODO(crbug.com/dawn/1987):