[dawn][wire] Implements timed WaitAny for Dawn wire client.

- Implements timed WaitAny via conditional variables on the wire
  client. Since the wire client now spontaneously receives future
  completion tasks, we can use conditional variables to notice
  changes in the state of the events, and fire the callbacks
  via WaitAny.
- Updates EventTests.cpp to test pretty much all possible callback
  mode and wait mechanism combinations, both on and off the wire.
- Updates other relevant tests w.r.t the wire not being able to
  handle timed waits.
- Note that this change required refcounting the client future
  events since we use the presence of the event in the map as an
  indicator to that both the event and it's corresponding
  callback are completed. This means we can't remove the event
  from the list until the callback is finished, hence some of the
  changes.
- This change also updates the way that the TrackedEvents on the
  client handle a call to `Complete`. Since multiple threads
  could potentially call `Complete`, we need to ensure that only
  one thread will ever really complete the callback.

Bug: 441981783
Change-Id: I45ad9507f558860ceb7284ee9487e880e6e868ac
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/260461
Reviewed-by: Corentin Wallez <cwallez@chromium.org>
Reviewed-by: Kai Ninomiya <kainino@chromium.org>
Commit-Queue: Loko Kung <lokokung@google.com>
diff --git a/src/dawn/tests/DawnTest.cpp b/src/dawn/tests/DawnTest.cpp
index f8e1b9e..183de0c 100644
--- a/src/dawn/tests/DawnTest.cpp
+++ b/src/dawn/tests/DawnTest.cpp
@@ -437,15 +437,12 @@
     wgpu::InstanceDescriptor instanceDesc{};
     instanceDesc.nextInChain = &dawnInstanceDesc;
     std::vector<wgpu::InstanceFeatureName> features = {
-        wgpu::InstanceFeatureName::MultipleDevicesPerAdapter};
-    if (!UsesWire()) {
-        features.push_back(wgpu::InstanceFeatureName::TimedWaitAny);
-    }
+        wgpu::InstanceFeatureName::MultipleDevicesPerAdapter,
+        wgpu::InstanceFeatureName::TimedWaitAny};
     instanceDesc.requiredFeatureCount = features.size();
     instanceDesc.requiredFeatures = features.data();
 
-    auto instance = std::make_unique<native::Instance>(
-        reinterpret_cast<const WGPUInstanceDescriptor*>(&instanceDesc));
+    auto instance = std::make_unique<native::Instance>(&instanceDesc);
 
 #ifdef DAWN_ENABLE_BACKEND_OPENGLES
     if (GetEnvironmentVar("ANGLE_DEFAULT_PLATFORM").first.empty()) {
@@ -1129,6 +1126,10 @@
     return mBackendAdapter;
 }
 
+utils::WireHelper* DawnTestBase::GetWireHelper() const {
+    return mWireHelper.get();
+}
+
 std::vector<wgpu::FeatureName> DawnTestBase::GetRequiredFeatures() {
     return {};
 }
@@ -1298,6 +1299,9 @@
     // By default we enable all the WGSL language features (including experimental, testing and
     // unsafe ones) in the tests.
     WGPUInstanceDescriptor instanceDesc = {};
+    std::vector<WGPUInstanceFeatureName> features = {WGPUInstanceFeatureName_TimedWaitAny};
+    instanceDesc.requiredFeatureCount = features.size();
+    instanceDesc.requiredFeatures = features.data();
     WGPUDawnWireWGSLControl wgslControl;
     wgslControl.chain.sType = WGPUSType_DawnWireWGSLControl;
     wgslControl.enableExperimental = 1u;
diff --git a/src/dawn/tests/DawnTest.h b/src/dawn/tests/DawnTest.h
index 5f0a631..44a003a 100644
--- a/src/dawn/tests/DawnTest.h
+++ b/src/dawn/tests/DawnTest.h
@@ -670,6 +670,9 @@
     // Exposed device creation helper for tests to use when needing more than 1 device.
     wgpu::Device CreateDevice(std::string isolationKey = "");
 
+    // Get the WireHelper to assist in creating additional Instances when relevant in tests.
+    utils::WireHelper* GetWireHelper() const;
+
     // Called in SetUp() to get the features required to be enabled in the tests. The tests must
     // check if the required features are supported by the adapter in this function and guarantee
     // the returned features are all supported by the adapter. The tests may provide different
diff --git a/src/dawn/tests/end2end/BasicTests.cpp b/src/dawn/tests/end2end/BasicTests.cpp
index b1f73b6..59bb53a 100644
--- a/src/dawn/tests/end2end/BasicTests.cpp
+++ b/src/dawn/tests/end2end/BasicTests.cpp
@@ -25,6 +25,7 @@
 // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
+#include <limits>
 #include <unordered_set>
 
 #include "dawn/common/FutureUtils.h"
@@ -80,7 +81,12 @@
     wgpu::InstanceLimits out{};
     auto status = wgpu::GetInstanceLimits(&out);
     EXPECT_EQ(status, wgpu::Status::Success);
-    EXPECT_EQ(out.timedWaitAnyMaxCount, kTimedWaitAnyMaxCountDefault);
+    if (UsesWire()) {
+        // The wire's limit is just the max size value.
+        EXPECT_EQ(out.timedWaitAnyMaxCount, std::numeric_limits<size_t>::max());
+    } else {
+        EXPECT_EQ(out.timedWaitAnyMaxCount, kTimedWaitAnyMaxCountDefault);
+    }
     EXPECT_EQ(out.nextInChain, nullptr);
 
     wgpu::ChainedStructOut chained{};
@@ -102,8 +108,11 @@
     auto features = std::unordered_set(out.features, out.features + out.featureCount);
 
     if (UsesWire()) {
-        // Wire exposes no features because it doesn't support CreateInstance.
-        EXPECT_EQ(features.size(), 0u);
+        // Wire exposes a subset of features.
+        static const auto kWireFeatures = std::unordered_set{
+            wgpu::InstanceFeatureName::TimedWaitAny,
+        };
+        EXPECT_EQ(features, kWireFeatures);
     } else {
         // Native (currently) exposes all known features.
         EXPECT_EQ(features, kKnownFeatures);
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp
index e775b0a..7cbb890 100644
--- a/src/dawn/tests/end2end/EventTests.cpp
+++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -39,6 +39,7 @@
 #include "dawn/tests/DawnTest.h"
 #include "dawn/utils/SystemUtils.h"
 #include "dawn/utils/WGPUHelpers.h"
+#include "dawn/utils/WireHelper.h"
 
 namespace dawn {
 namespace {
@@ -46,7 +47,7 @@
 using testing::AnyOf;
 using testing::Eq;
 
-wgpu::Device CreateExtraDevice(wgpu::Instance instance) {
+wgpu::Device CreateExtraDevice(utils::WireHelper* wireHelper, wgpu::Instance instance) {
     // IMPORTANT: DawnTest overrides RequestAdapter and RequestDevice and mixes
     // up the two instances. We use these to bypass the override.
     auto* requestAdapter = reinterpret_cast<WGPUProcInstanceRequestAdapter>(
@@ -54,6 +55,8 @@
     auto* requestDevice = reinterpret_cast<WGPUProcAdapterRequestDevice>(
         wgpu::GetProcAddress("wgpuAdapterRequestDevice"));
 
+    bool flushSuccess = false;
+
     wgpu::Adapter adapter2;
     requestAdapter(instance.Get(), nullptr,
                    {nullptr, WGPUCallbackMode_AllowSpontaneous,
@@ -62,6 +65,8 @@
                         *reinterpret_cast<wgpu::Adapter*>(result) = wgpu::Adapter::Acquire(adapter);
                     },
                     nullptr, &adapter2});
+    flushSuccess = wireHelper->FlushClient();
+    DAWN_ASSERT(flushSuccess);
     DAWN_ASSERT(adapter2);
 
     wgpu::Device device2;
@@ -73,15 +78,18 @@
                        *reinterpret_cast<wgpu::Device*>(result) = wgpu::Device::Acquire(device);
                    },
                    nullptr, &device2});
+    flushSuccess = wireHelper->FlushClient();
+    DAWN_ASSERT(flushSuccess);
     DAWN_ASSERT(device2);
 
     return device2;
 }
 
-std::pair<wgpu::Instance, wgpu::Device> CreateExtraInstance(wgpu::InstanceDescriptor* desc) {
-    wgpu::Instance instance2 = wgpu::CreateInstance(desc);
+std::pair<wgpu::Instance, wgpu::Device> CreateExtraInstance(utils::WireHelper* wireHelper,
+                                                            wgpu::InstanceDescriptor* desc) {
+    auto [instance2, nativeInstance] = wireHelper->CreateInstances(desc, desc);
 
-    wgpu::Device device2 = CreateExtraDevice(instance2);
+    wgpu::Device device2 = CreateExtraDevice(wireHelper, instance2);
     DAWN_ASSERT(device2);
 
     return std::pair(std::move(instance2), std::move(device2));
@@ -140,14 +148,18 @@
     void SetUp() override {
         DawnTestWithParams::SetUp();
         WaitTypeAndCallbackMode mode = GetParam().mWaitTypeAndCallbackMode;
-        if (UsesWire()) {
-            DAWN_TEST_UNSUPPORTED_IF(mode == WaitTypeAndCallbackMode::TimedWaitAny_WaitAnyOnly ||
-                                     mode ==
-                                         WaitTypeAndCallbackMode::TimedWaitAny_AllowSpontaneous ||
-                                     mode == WaitTypeAndCallbackMode::Spin_AllowSpontaneous);
-        }
+        // TODO(crbug.com/412761228): Once spontaneous events are supported in the other
+        // backends, enable relevant tests for them as well.
         if (!IsMetal()) {
+            // Spontaneous is only supported on Metal at the moment.
             DAWN_TEST_UNSUPPORTED_IF(mode == WaitTypeAndCallbackMode::Spin_AllowSpontaneous);
+            if (UsesWire()) {
+                // Timed wait any is only supported on the wire if the native backend supports
+                // spontaneous.
+                DAWN_TEST_UNSUPPORTED_IF(
+                    mode == WaitTypeAndCallbackMode::TimedWaitAny_WaitAnyOnly ||
+                    mode == WaitTypeAndCallbackMode::TimedWaitAny_AllowSpontaneous);
+            }
         }
         testInstance = GetInstance();
         testDevice = device;
@@ -159,13 +171,11 @@
 
     void UseSecondInstance() {
         wgpu::InstanceDescriptor desc;
-        if (!UsesWire()) {
             static constexpr auto kTimedWaitAny = wgpu::InstanceFeatureName::TimedWaitAny;
             desc.requiredFeatureCount = 1;
             desc.requiredFeatures = &kTimedWaitAny;
-        }
-        std::tie(testInstance, testDevice) = CreateExtraInstance(&desc);
-        testQueue = testDevice.GetQueue();
+            std::tie(testInstance, testDevice) = CreateExtraInstance(GetWireHelper(), &desc);
+            testQueue = testDevice.GetQueue();
     }
 
     void LoseTestDevice() {
@@ -270,10 +280,10 @@
                 // Loop at least once so we can test it with 0 futures.
                 do {
                     ASSERT_FALSE(testTimeExceeded());
-                    DAWN_ASSERT(!UsesWire());
                     wgpu::WaitStatus status;
 
                     uint64_t oldCompletionCount = mCallbacksCompletedCount;
+                    FlushWire();
                     // Any futures should succeed within a few milliseconds at most.
                     status = testInstance.WaitAny(mFutures.size(), mFutures.data(), UINT64_MAX);
                     ASSERT_EQ(status, wgpu::WaitStatus::Success);
@@ -331,6 +341,7 @@
             case WaitType::Spin: {
                 do {
                     ASSERT_FALSE(testTimeExceeded());
+                    FlushWire();
                     utils::USleep(100);
                 } while (mCallbacksCompletedCount < mCallbacksIssuedCount);
             } break;
@@ -504,17 +515,10 @@
     wgpu::Instance instance2;
     wgpu::Device device2;
 
-    if (UsesWire()) {
-        // The wire (currently) never supports TimedWaitAny, so we can run this test on the
-        // default instance/device.
-        instance2 = GetInstance();
-        device2 = device;
-    } else {
-        // When not using the wire, DawnTest will unconditionally enable TimedWaitAny since it's
-        // useful for other tests. For this test, we need it to be false to test validation.
-        wgpu::InstanceDescriptor desc;
-        std::tie(instance2, device2) = CreateExtraInstance(&desc);
-    }
+    // When not using the wire, DawnTest will unconditionally enable TimedWaitAny since it's
+    // useful for other tests. For this test, we need it to be false to test validation.
+    wgpu::InstanceDescriptor desc;
+    std::tie(instance2, device2) = CreateExtraInstance(GetWireHelper(), &desc);
 
     // UnsupportedTimeout is still validated if no futures are passed.
     for (uint64_t timeout : {uint64_t(1), uint64_t(0), UINT64_MAX}) {
@@ -538,42 +542,26 @@
 }
 
 TEST_P(WaitAnyTests, UnsupportedCount) {
-    wgpu::Instance instance2;
-    wgpu::Device device2;
-    wgpu::Queue queue2;
-
-    if (UsesWire()) {
-        // The wire (currently) never supports TimedWaitAny, so we can run this test on the
-        // default instance/device.
-        instance2 = GetInstance();
-        device2 = device;
-        queue2 = queue;
-    } else {
-        wgpu::InstanceDescriptor desc;
-        static constexpr auto kTimedWaitAny = wgpu::InstanceFeatureName::TimedWaitAny;
-        desc.requiredFeatureCount = 1;
-        desc.requiredFeatures = &kTimedWaitAny;
-        std::tie(instance2, device2) = CreateExtraInstance(&desc);
-        queue2 = device2.GetQueue();
-    }
-
     for (uint64_t timeout : {uint64_t(0), uint64_t(1)}) {
+        // TODO(crbug.com/443308345): D3D11's delay flush queue doesn't seem to work if we are only
+        // using a small timeout. It works if we increase the timeout to 0.2-0.3ms, but it should
+        // really be immediate.
+        DAWN_TEST_UNSUPPORTED_IF(timeout > 0 && HasToggleEnabled("d3d11_delay_flush_to_gpu"));
+
         // We don't support values higher than the default (64), and if you ask for lower than 64
         // you still get 64. DawnTest doesn't request anything (so requests 0) so gets 64.
         for (size_t count : {kTimedWaitAnyMaxCountDefault, kTimedWaitAnyMaxCountDefault + 1}) {
             std::vector<wgpu::FutureWaitInfo> infos;
             for (size_t i = 0; i < count; ++i) {
-                infos.push_back({queue2.OnSubmittedWorkDone(
+                infos.push_back({queue.OnSubmittedWorkDone(
                     wgpu::CallbackMode::WaitAnyOnly,
                     [](wgpu::QueueWorkDoneStatus, wgpu::StringView) {})});
             }
-            wgpu::WaitStatus status = instance2.WaitAny(infos.size(), infos.data(), timeout);
+            FlushWire();
+            wgpu::WaitStatus status = instance.WaitAny(infos.size(), infos.data(), timeout);
             if (timeout == 0) {
                 ASSERT_TRUE(status == wgpu::WaitStatus::Success ||
                             status == wgpu::WaitStatus::TimedOut);
-            } else if (UsesWire()) {
-                // Wire doesn't support timeouts at all.
-                ASSERT_EQ(status, wgpu::WaitStatus::Error);
             } else if (count <= 64) {
                 ASSERT_EQ(status, wgpu::WaitStatus::Success);
             } else {
@@ -584,45 +572,28 @@
 }
 
 TEST_P(WaitAnyTests, UnsupportedMixedSources) {
-    wgpu::Instance instance2;
-    wgpu::Device device2;
-    wgpu::Queue queue2;
-    wgpu::Device device3;
-    wgpu::Queue queue3;
+    wgpu::Device device1 = device;
+    wgpu::Device device2 = CreateDevice();
 
-    if (UsesWire()) {
-        // The wire (currently) never supports TimedWaitAny, so we can run this test on the
-        // default instance/device.
-        instance2 = GetInstance();
-        device2 = device;
-        queue2 = queue;
-        device3 = CreateDevice();
-        queue3 = device3.GetQueue();
-    } else {
-        wgpu::InstanceDescriptor desc;
-        static constexpr auto kTimedWaitAny = wgpu::InstanceFeatureName::TimedWaitAny;
-        desc.requiredFeatureCount = 1;
-        desc.requiredFeatures = &kTimedWaitAny;
-        std::tie(instance2, device2) = CreateExtraInstance(&desc);
-        queue2 = device2.GetQueue();
-        device3 = CreateExtraDevice(instance2);
-        queue3 = device3.GetQueue();
-    }
+    wgpu::Queue queue1 = queue;
+    wgpu::Queue queue2 = device2.GetQueue();
 
     for (uint64_t timeout : {uint64_t(0), uint64_t(1)}) {
         std::vector<wgpu::FutureWaitInfo> infos{{
+            {queue1.OnSubmittedWorkDone(wgpu::CallbackMode::WaitAnyOnly,
+                                        [](wgpu::QueueWorkDoneStatus, wgpu::StringView) {})},
             {queue2.OnSubmittedWorkDone(wgpu::CallbackMode::WaitAnyOnly,
                                         [](wgpu::QueueWorkDoneStatus, wgpu::StringView) {})},
-            {queue3.OnSubmittedWorkDone(wgpu::CallbackMode::WaitAnyOnly,
-                                        [](wgpu::QueueWorkDoneStatus, wgpu::StringView) {})},
         }};
-        wgpu::WaitStatus status = instance2.WaitAny(infos.size(), infos.data(), timeout);
+        FlushWire();
+        wgpu::WaitStatus status = instance.WaitAny(infos.size(), infos.data(), timeout);
         if (timeout == 0) {
             ASSERT_TRUE(status == wgpu::WaitStatus::Success ||
                         status == wgpu::WaitStatus::TimedOut);
         } else if (UsesWire()) {
-            // Wire doesn't support timeouts at all.
-            ASSERT_EQ(status, wgpu::WaitStatus::Error);
+            // Wire supports mixed source waiting.
+            ASSERT_TRUE(status == wgpu::WaitStatus::Success ||
+                        status == wgpu::WaitStatus::TimedOut);
         } else {
             ASSERT_EQ(status, wgpu::WaitStatus::Error);
         }
@@ -632,8 +603,10 @@
 // Test that submitting multiple heavy works then waiting one by one works.
 // This is a regression test for crbug.com/dawn/415561579
 TEST_P(WaitAnyTests, WaitHeavyWorksOneByOne) {
-    // Wire doesn't support timeouts.
-    DAWN_TEST_UNSUPPORTED_IF(UsesWire());
+    // Wire doesn't support timeouts unless its the Metal backend.
+    // TODO(crbug.com/412761228): Once spontaneous events are supported in the other backends,
+    // enable this test for them as well.
+    DAWN_TEST_UNSUPPORTED_IF(UsesWire() && !IsMetal());
 
     wgpu::Buffer countBuffer;
     wgpu::Buffer ssbo;
@@ -688,6 +661,7 @@
         future = queue.OnSubmittedWorkDone(wgpu::CallbackMode::WaitAnyOnly,
                                            [](wgpu::QueueWorkDoneStatus, wgpu::StringView) {});
     }
+    FlushWire();
 
     for (const auto& future : futures) {
         wgpu::WaitStatus status = instance.WaitAny(future, UINT64_MAX);
diff --git a/src/dawn/wire/client/Adapter.cpp b/src/dawn/wire/client/Adapter.cpp
index e1debff..d0edc8e 100644
--- a/src/dawn/wire/client/Adapter.cpp
+++ b/src/dawn/wire/client/Adapter.cpp
@@ -289,7 +289,7 @@
     Client* client = GetClient();
     Ref<Device> device = client->Make<Device>(GetEventManagerHandle(), this, descriptor);
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<RequestDeviceEvent>(callbackInfo, device));
+        GetEventManager().TrackEvent(AcquireRef(new RequestDeviceEvent(callbackInfo, device)));
     if (!tracked) {
         return {futureIDInternal};
     }
diff --git a/src/dawn/wire/client/Buffer.cpp b/src/dawn/wire/client/Buffer.cpp
index 08adfa9..bdd6777 100644
--- a/src/dawn/wire/client/Buffer.cpp
+++ b/src/dawn/wire/client/Buffer.cpp
@@ -359,7 +359,7 @@
                                const WGPUBufferMapCallbackInfo& callbackInfo) {
     Client* client = GetClient();
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<MapAsyncEvent>(callbackInfo, this));
+        GetEventManager().TrackEvent(AcquireRef(new MapAsyncEvent(callbackInfo, this)));
     if (!tracked) {
         return {futureIDInternal};
     }
diff --git a/src/dawn/wire/client/Client.cpp b/src/dawn/wire/client/Client.cpp
index f8cc63f..6d7c7ad 100644
--- a/src/dawn/wire/client/Client.cpp
+++ b/src/dawn/wire/client/Client.cpp
@@ -27,6 +27,8 @@
 
 #include "dawn/wire/client/Client.h"
 
+#include <algorithm>
+
 #include "dawn/common/Compiler.h"
 #include "dawn/common/StringViewUtils.h"
 #include "dawn/wire/client/Device.h"
@@ -121,8 +123,26 @@
         return {nullptr, {0, 0}};
     }
 
+    // Check for future related features and limits that are relevant to the EventManager.
+    bool enabledTimedWaitAny = false;
+    size_t timedWaitAnyMaxCount = 0;
+    if (descriptor) {
+        auto instanceFeatures =
+            std::span(descriptor->requiredFeatures, descriptor->requiredFeatureCount);
+        enabledTimedWaitAny =
+            std::find(instanceFeatures.begin(), instanceFeatures.end(),
+                      WGPUInstanceFeatureName_TimedWaitAny) != instanceFeatures.end();
+        if (enabledTimedWaitAny) {
+            if (descriptor->requiredLimits) {
+                timedWaitAnyMaxCount = descriptor->requiredLimits->timedWaitAnyMaxCount;
+            }
+            timedWaitAnyMaxCount = std::max(timedWaitAnyMaxCount, kTimedWaitAnyMaxCountDefault);
+        }
+    }
+
     // Reserve an EventManager for the given instance and make the association in the map.
-    mEventManagers.emplace(instance->GetWireHandle(this), std::make_unique<EventManager>());
+    mEventManagers.emplace(instance->GetWireHandle(this),
+                           std::make_unique<EventManager>(timedWaitAnyMaxCount));
 
     ReservedInstance result;
     result.handle = instance->GetWireHandle(this);
diff --git a/src/dawn/wire/client/Device.cpp b/src/dawn/wire/client/Device.cpp
index 4bea65b..e483089 100644
--- a/src/dawn/wire/client/Device.cpp
+++ b/src/dawn/wire/client/Device.cpp
@@ -261,7 +261,7 @@
     if (descriptor != nullptr && descriptor->deviceLostCallbackInfo.callback != nullptr) {
         deviceLostCallbackInfo = descriptor->deviceLostCallbackInfo;
     }
-    mDeviceLostInfo.event = std::make_unique<DeviceLostEvent>(deviceLostCallbackInfo, this);
+    mDeviceLostInfo.event = AcquireRef(new DeviceLostEvent(deviceLostCallbackInfo, this));
 
     mUncapturedErrorCallbackInfo = kDefaultUncapturedErrorCallbackInfo;
     if (descriptor != nullptr && descriptor->uncapturedErrorCallbackInfo.callback != nullptr) {
@@ -358,7 +358,7 @@
 WGPUFuture Device::APIPopErrorScope(const WGPUPopErrorScopeCallbackInfo& callbackInfo) {
     Client* client = GetClient();
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<PopErrorScopeEvent>(callbackInfo));
+        GetEventManager().TrackEvent(AcquireRef(new PopErrorScopeEvent(callbackInfo)));
     if (!tracked) {
         return {futureIDInternal};
     }
@@ -429,7 +429,7 @@
     Client* client = GetClient();
     Ref<Pipeline> pipeline = client->Make<Pipeline>();
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<Event>(callbackInfo, pipeline));
+        GetEventManager().TrackEvent(AcquireRef(new Event(callbackInfo, pipeline)));
     if (!tracked) {
         return {futureIDInternal};
     }
diff --git a/src/dawn/wire/client/Device.h b/src/dawn/wire/client/Device.h
index c4539e3..c97ef15 100644
--- a/src/dawn/wire/client/Device.h
+++ b/src/dawn/wire/client/Device.h
@@ -100,7 +100,7 @@
 
     struct DeviceLostInfo {
         FutureID futureID = kNullFutureID;
-        std::unique_ptr<TrackedEvent> event = nullptr;
+        Ref<TrackedEvent> event = nullptr;
     };
     DeviceLostInfo mDeviceLostInfo;
 
diff --git a/src/dawn/wire/client/EventManager.cpp b/src/dawn/wire/client/EventManager.cpp
index 3da58b7..18caebd 100644
--- a/src/dawn/wire/client/EventManager.cpp
+++ b/src/dawn/wire/client/EventManager.cpp
@@ -27,12 +27,14 @@
 
 #include <map>
 #include <optional>
+#include <span>
 #include <utility>
 #include <vector>
 
 #include "dawn/wire/client/EventManager.h"
 
 #include "dawn/common/Log.h"
+#include "dawn/common/Time.h"
 #include "dawn/wire/client/Client.h"
 
 namespace dawn::wire::client {
@@ -50,7 +52,7 @@
 }
 
 bool TrackedEvent::IsReady() const {
-    return mEventState == EventState::Ready;
+    return mEventState == EventState::Ready || mEventState == EventState::Running;
 }
 
 void TrackedEvent::SetReady() {
@@ -59,18 +61,59 @@
 }
 
 void TrackedEvent::Complete(FutureID futureID, EventCompletionType type) {
-    DAWN_ASSERT(mEventState != EventState::Complete);
-    CompleteImpl(futureID, type);
-    mEventState = EventState::Complete;
+    EventState state = mEventState.load(std::memory_order::acquire);
+
+    auto TryComplete = [&]() -> bool {
+        switch (state) {
+            case EventState::Pending:
+                // If we are completing a Pending future, it must be a shutdown.
+                DAWN_ASSERT(type == EventCompletionType::Shutdown);
+                [[fallthrough]];
+            case EventState::Ready: {
+                if (mEventState.compare_exchange_strong(state, EventState::Running,
+                                                        std::memory_order::acq_rel)) {
+                    // Only one thread is ever be allowed to run |CompleteImpl|.
+                    CompleteImpl(futureID, type);
+                    mEventState = EventState::Complete;
+                    mEventState.notify_all();
+                    return true;
+                }
+                return false;
+            }
+            case EventState::Running: {
+                // Wait until the event is no longer in the |Running| state.
+                if (type == EventCompletionType::Shutdown) {
+                    // If we are running, but we see a completion type of Shutdown, we don't wait
+                    // for the state to transition because the shutdown could have been caused by
+                    // the callback. Instead, just return true.
+                    mEventState = EventState::Complete;
+                    return true;
+                }
+                mEventState.wait(EventState::Running, std::memory_order::acq_rel);
+                return true;
+            }
+            case EventState::Complete:
+                return true;
+        }
+    };
+
+    bool completed = false;
+    while (!completed) {
+        completed = TryComplete();
+    }
+    DAWN_ASSERT(mEventState == EventState::Complete);
 }
 
 // EventManager
 
+EventManager::EventManager(size_t timedWaitAnyMaxCount)
+    : mTimedWaitAnyMaxCount(timedWaitAnyMaxCount) {}
+
 EventManager::~EventManager() {
     TransitionTo(State::ClientDropped);
 }
 
-std::pair<FutureID, bool> EventManager::TrackEvent(std::unique_ptr<TrackedEvent> event) {
+std::pair<FutureID, bool> EventManager::TrackEvent(Ref<TrackedEvent>&& event) {
     if (!ValidateCallbackMode(event->GetCallbackMode())) {
         dawn::ErrorLog() << "Invalid callback mode: " << event->GetCallbackMode();
         return {kNullFutureID, false};
@@ -113,7 +156,7 @@
     mState = state;
 
     while (true) {
-        std::map<FutureID, std::unique_ptr<TrackedEvent>> events;
+        EventMap events;
         switch (state) {
             case State::InstanceDropped: {
                 mTrackedEvents.Use([&](auto trackedEvents) {
@@ -147,38 +190,70 @@
 }
 
 void EventManager::ProcessPollEvents() {
-    std::vector<std::pair<FutureID, std::unique_ptr<TrackedEvent>>> eventsToCompleteNow;
-    mTrackedEvents.Use([&](auto trackedEvents) {
-        for (auto it = trackedEvents->begin(); it != trackedEvents->end();) {
-            auto& event = it->second;
+    std::vector<std::pair<FutureID, Ref<TrackedEvent>>> eventsToComplete;
+    mTrackedEvents.ConstUse([&](auto trackedEvents) {
+        for (auto& [futureID, event] : *trackedEvents) {
             WGPUCallbackMode callbackMode = event->GetCallbackMode();
-            bool shouldRemove = (callbackMode == WGPUCallbackMode_AllowProcessEvents ||
-                                 callbackMode == WGPUCallbackMode_AllowSpontaneous) &&
-                                event->IsReady();
-            if (!shouldRemove) {
-                ++it;
-                continue;
+            if ((callbackMode == WGPUCallbackMode_AllowProcessEvents ||
+                 callbackMode == WGPUCallbackMode_AllowSpontaneous) &&
+                event->IsReady()) {
+                eventsToComplete.emplace_back(futureID, event);
             }
-            eventsToCompleteNow.emplace_back(it->first, std::move(event));
-            it = trackedEvents->erase(it);
         }
     });
 
     // Since events were initially stored and iterated from an ordered map, they must be ordered.
-    for (auto& [futureID, event] : eventsToCompleteNow) {
+    for (auto& [futureID, event] : eventsToComplete) {
         event->Complete(futureID, EventCompletionType::Ready);
     }
+
+    mTrackedEvents.Use([&](auto trackedEvents) {
+        for (auto& [futureID, _] : eventsToComplete) {
+            trackedEvents->erase(futureID);
+        }
+    });
 }
 
+namespace {
+bool UpdateAnyCompletedOrReady(std::span<WGPUFutureWaitInfo> waitInfos,
+                               const EventManager::EventMap& allEvents,
+                               EventManager::EventMap* eventsToComplete) {
+    DAWN_ASSERT(eventsToComplete->empty());
+
+    bool anyCompleted = false;
+    for (auto& waitInfo : waitInfos) {
+        auto it = allEvents.find(waitInfo.future.id);
+        if (it == allEvents.end()) {
+            waitInfo.completed = true;
+            anyCompleted = true;
+            continue;
+        }
+
+        auto& event = it->second;
+        if (event->IsReady()) {
+            waitInfo.completed = true;
+            anyCompleted = true;
+            eventsToComplete->emplace(it->first, event);
+        }
+    }
+
+    DAWN_ASSERT(eventsToComplete->empty() || anyCompleted);
+    return anyCompleted;
+}
+}  // anonymous namespace
+
 WGPUWaitStatus EventManager::WaitAny(size_t count, WGPUFutureWaitInfo* infos, uint64_t timeoutNS) {
-    // Validate for feature support.
     if (timeoutNS > 0) {
-        // Wire doesn't support timedWaitEnable (for now). (There's no UnsupportedCount or
-        // UnsupportedMixedSources validation here, because those only apply to timed waits.)
-        //
-        // TODO(crbug.com/dawn/1987): CreateInstance needs to validate timedWaitEnable was false.
-        dawn::ErrorLog() << "Dawn wire does not currently support timed waits.";
-        return WGPUWaitStatus_Error;
+        if (mTimedWaitAnyMaxCount == 0) {
+            dawn::ErrorLog() << "Instance only supports timed wait anys if "
+                                "WGPUInstanceFeatureName_TimedWaitAny is enabled.";
+            return WGPUWaitStatus_Error;
+        }
+        if (count > mTimedWaitAnyMaxCount) {
+            dawn::ErrorLog() << "Instance only supports up to (" << mTimedWaitAnyMaxCount
+                             << ") timed wait anys.";
+            return WGPUWaitStatus_Error;
+        }
     }
 
     if (count == 0) {
@@ -187,37 +262,31 @@
 
     // Since the user can specify the FutureIDs in any order, we need to use another ordered map
     // here to ensure that the result is ordered for JS event ordering.
-    std::map<FutureID, std::unique_ptr<TrackedEvent>> eventsToCompleteNow;
-    bool anyCompleted = false;
-    const FutureID firstInvalidFutureID = mNextFutureID;
-    mTrackedEvents.Use([&](auto trackedEvents) {
-        for (size_t i = 0; i < count; ++i) {
-            FutureID futureID = infos[i].future.id;
-            DAWN_ASSERT(futureID < firstInvalidFutureID);
-
-            auto it = trackedEvents->find(futureID);
-            if (it == trackedEvents->end()) {
-                infos[i].completed = true;
-                anyCompleted = true;
-                continue;
-            }
-
-            auto& event = it->second;
-            // Early update .completed, in prep to complete the callback if ready.
-            infos[i].completed = event->IsReady();
-            if (event->IsReady()) {
-                anyCompleted = true;
-                eventsToCompleteNow.emplace(it->first, std::move(event));
-                trackedEvents->erase(it);
-            }
+    auto waitInfos = std::span(infos, count);
+    EventMap eventsToComplete;
+    bool anyCompleted = mTrackedEvents.ConstUse([&](auto trackedEvents) {
+        if (UpdateAnyCompletedOrReady(waitInfos, *trackedEvents, &eventsToComplete)) {
+            return true;
         }
+        if (timeoutNS > 0) {
+            return trackedEvents.WaitFor(Nanoseconds(timeoutNS), [&](const EventMap& events) {
+                return UpdateAnyCompletedOrReady(waitInfos, events, &eventsToComplete);
+            });
+        }
+        return false;
     });
 
-    for (auto& [futureID, event] : eventsToCompleteNow) {
+    for (auto& [futureID, event] : eventsToComplete) {
         // .completed has already been set to true (before the callback, per API contract).
         event->Complete(futureID, EventCompletionType::Ready);
     }
 
+    mTrackedEvents.Use([&](auto trackedEvents) {
+        for (auto& [futureID, _] : eventsToComplete) {
+            trackedEvents->erase(futureID);
+        }
+    });
+
     return anyCompleted ? WGPUWaitStatus_Success : WGPUWaitStatus_TimedOut;
 }
 
diff --git a/src/dawn/wire/client/EventManager.h b/src/dawn/wire/client/EventManager.h
index 82003d4..983aa01 100644
--- a/src/dawn/wire/client/EventManager.h
+++ b/src/dawn/wire/client/EventManager.h
@@ -30,6 +30,7 @@
 
 #include <webgpu/webgpu.h>
 
+#include <atomic>
 #include <cstddef>
 #include <functional>
 #include <map>
@@ -38,7 +39,9 @@
 
 #include "dawn/common/FutureUtils.h"
 #include "dawn/common/MutexProtected.h"
-#include "dawn/common/NonCopyable.h"
+#include "dawn/common/NonMovable.h"
+#include "dawn/common/Ref.h"
+#include "dawn/common/RefCounted.h"
 #include "dawn/wire/WireResult.h"
 #include "partition_alloc/pointers/raw_ptr.h"
 
@@ -64,10 +67,10 @@
 // into a local copy that can be readily used by the user callback. Specifically, the wire
 // deserialization data is guaranteed to be alive when the ReadyHook is called, but not when
 // CompleteImpl is called.
-class TrackedEvent : NonMovable {
+class TrackedEvent : public RefCounted {
   public:
     explicit TrackedEvent(WGPUCallbackMode mode);
-    virtual ~TrackedEvent();
+    ~TrackedEvent() override;
 
     virtual EventType GetType() = 0;
 
@@ -84,9 +87,10 @@
     enum class EventState {
         Pending,
         Ready,
+        Running,
         Complete,
     };
-    EventState mEventState = EventState::Pending;
+    std::atomic<EventState> mEventState = EventState::Pending;
 };
 
 // Subcomponent which tracks callback events for the Future-based callback
@@ -96,6 +100,9 @@
 // TODO(crbug.com/dawn/2060): This should probably be merged together with RequestTracker.
 class EventManager final : NonMovable {
   public:
+    using EventMap = std::map<FutureID, Ref<TrackedEvent>>;
+
+    explicit EventManager(size_t timedWaitAnyMaxCount);
     ~EventManager();
 
     // See mState for breakdown of these states.
@@ -103,7 +110,7 @@
 
     // Returns a pair of the FutureID and a bool that is true iff the event was successfuly tracked,
     // false otherwise. Events may not be tracked if the client is already disconnected.
-    std::pair<FutureID, bool> TrackEvent(std::unique_ptr<TrackedEvent> event);
+    std::pair<FutureID, bool> TrackEvent(Ref<TrackedEvent>&& event);
 
     // Transitions the EventManager to the given state. Note that states can only go in one
     // direction, i.e. once the EventManager transitions to InstanceDropped, it cannot transition
@@ -117,7 +124,7 @@
             return WireResult::FatalError;
         }
 
-        std::unique_ptr<TrackedEvent> spontaneousEvent;
+        Ref<TrackedEvent> spontaneousEvent;
         WireResult result = mTrackedEvents.Use([&](auto trackedEvents) {
             auto it = trackedEvents->find(futureID);
             if (it == trackedEvents->end()) {
@@ -133,21 +140,22 @@
                 return WireResult::FatalError;
             }
 
-            WireResult result = static_cast<Event*>(trackedEvent.get())
+            WireResult result = static_cast<Event*>(trackedEvent.Get())
                                     ->ReadyHook(futureID, std::forward<ReadyArgs>(readyArgs)...);
             trackedEvent->SetReady();
 
             // If the event can be spontaneously completed, prepare to do so now.
             if (trackedEvent->GetCallbackMode() == WGPUCallbackMode_AllowSpontaneous) {
-                spontaneousEvent = std::move(trackedEvent);
-                trackedEvents->erase(futureID);
+                spontaneousEvent = trackedEvent;
             }
+
             return result;
         });
 
         // Handle spontaneous completions.
         if (spontaneousEvent) {
             spontaneousEvent->Complete(futureID, EventCompletionType::Ready);
+            mTrackedEvents.Use([&](auto trackedEvents) { trackedEvents->erase(futureID); });
         }
         return result;
     }
@@ -156,6 +164,8 @@
     WGPUWaitStatus WaitAny(size_t count, WGPUFutureWaitInfo* infos, uint64_t timeoutNS);
 
   private:
+    const size_t mTimedWaitAnyMaxCount = 0;
+
     // Different states of the EventManager dictate how new incoming events are handled.
     //   Nominal: Usual state of the manager. All events are tracked and callbacks are fired
     //     depending on the callback modes.
@@ -170,8 +180,10 @@
 
     // Tracks all kinds of events (for both WaitAny and ProcessEvents). We use an ordered map so
     // that in most cases, event ordering is already implicit when we iterate the map. (Not true for
-    // WaitAny though because the user could specify the FutureIDs out of order.)
-    MutexProtected<std::map<FutureID, std::unique_ptr<TrackedEvent>>> mTrackedEvents;
+    // WaitAny though because the user could specify the FutureIDs out of order.) The condition
+    // variable is used in order to implement timed WaitAny and should notify anytime that the map
+    // or any event state inside the map has changed.
+    MutexCondVarProtected<EventMap> mTrackedEvents;
     std::atomic<FutureID> mNextFutureID = 1;
 };
 
diff --git a/src/dawn/wire/client/Instance.cpp b/src/dawn/wire/client/Instance.cpp
index 33a2c58..644c6ce 100644
--- a/src/dawn/wire/client/Instance.cpp
+++ b/src/dawn/wire/client/Instance.cpp
@@ -27,7 +27,9 @@
 
 #include "dawn/wire/client/Instance.h"
 
+#include <limits>
 #include <memory>
+#include <span>
 #include <string>
 #include <utility>
 
@@ -123,6 +125,9 @@
 
 }  // anonymous namespace
 
+static constexpr auto kSupportedFeatures =
+    std::array<WGPUInstanceFeatureName, 1>{WGPUInstanceFeatureName_TimedWaitAny};
+
 // Instance
 
 Instance::Instance(const ObjectBaseParams& params)
@@ -144,17 +149,26 @@
         return WireResult::Success;
     }
 
-    if (descriptor->requiredFeatureCount > 0) {
-        dawn::ErrorLog() << "Wire client doesn't support any WGPUInstanceFeatureName";
-        return WireResult::FatalError;
+    bool enabledTimedWaitAny = false;
+    for (auto feature : std::span(descriptor->requiredFeatures, descriptor->requiredFeatureCount)) {
+        if (std::find(kSupportedFeatures.begin(), kSupportedFeatures.end(), feature) ==
+            kSupportedFeatures.end()) {
+            dawn::ErrorLog() << "Wire client doesn't support WGPUInstanceFeatureName(" << feature
+                             << ")";
+            return WireResult::FatalError;
+        }
+        if (feature == WGPUInstanceFeatureName_TimedWaitAny) {
+            enabledTimedWaitAny = true;
+        }
     }
     if (descriptor->requiredLimits) {
         if (descriptor->requiredLimits->nextInChain != nullptr) {
             dawn::ErrorLog() << "Wire client doesn't support any WGPUInstanceLimits extensions";
             return WireResult::FatalError;
         }
-        if (descriptor->requiredLimits->timedWaitAnyMaxCount > 0) {
-            dawn::ErrorLog() << "Wire client doesn't support non-zero timedWaitAnyMaxCount";
+        if (!enabledTimedWaitAny && descriptor->requiredLimits->timedWaitAnyMaxCount > 0) {
+            dawn::ErrorLog() << "Wire client doesn't support non-zero timedWaitAnyMaxCount if "
+                                "WGPUInstanceFeatureName_TimedWaitAny is not enabled.";
             return WireResult::FatalError;
         }
     }
@@ -188,7 +202,7 @@
     Client* client = GetClient();
     Ref<Adapter> adapter = client->Make<Adapter>(GetEventManagerHandle());
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<RequestAdapterEvent>(callbackInfo, adapter));
+        GetEventManager().TrackEvent(AcquireRef(new RequestAdapterEvent(callbackInfo, adapter)));
     if (!tracked) {
         return {futureIDInternal};
     }
@@ -329,24 +343,22 @@
         return WGPUStatus_Error;
     }
 
-    limits->timedWaitAnyMaxCount = dawn::kTimedWaitAnyMaxCountDefault;
+    limits->timedWaitAnyMaxCount = std::numeric_limits<size_t>::max();
     return WGPUStatus_Success;
 }
 
-// No features listed here because CreateInstance is not supported in the wire.
-static constexpr auto kSupportedFeatures = std::array<WGPUInstanceFeatureName, 0>{};
-
 DAWN_WIRE_EXPORT WGPUBool wgpuDawnWireClientHasInstanceFeature(WGPUInstanceFeatureName feature) {
-    return std::find(kSupportedFeatures.begin(), kSupportedFeatures.end(), feature) !=
-           kSupportedFeatures.end();
+    return std::find(dawn::wire::client::kSupportedFeatures.begin(),
+                     dawn::wire::client::kSupportedFeatures.end(),
+                     feature) != dawn::wire::client::kSupportedFeatures.end();
 }
 
 DAWN_WIRE_EXPORT void wgpuDawnWireClientGetInstanceFeatures(
     WGPUSupportedInstanceFeatures* features) {
     DAWN_ASSERT(features != nullptr);
 
-    features->featureCount = kSupportedFeatures.size();
-    features->features = kSupportedFeatures.data();
+    features->featureCount = dawn::wire::client::kSupportedFeatures.size();
+    features->features = dawn::wire::client::kSupportedFeatures.data();
 }
 
 DAWN_WIRE_EXPORT WGPUInstance
diff --git a/src/dawn/wire/client/Queue.cpp b/src/dawn/wire/client/Queue.cpp
index 2845059..a724441 100644
--- a/src/dawn/wire/client/Queue.cpp
+++ b/src/dawn/wire/client/Queue.cpp
@@ -102,7 +102,7 @@
 
     Client* client = GetClient();
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<WorkDoneEvent>(callbackInfo));
+        GetEventManager().TrackEvent(AcquireRef(new WorkDoneEvent(callbackInfo)));
     if (!tracked) {
         return {futureIDInternal};
     }
diff --git a/src/dawn/wire/client/ShaderModule.cpp b/src/dawn/wire/client/ShaderModule.cpp
index 8493106..470cbfa 100644
--- a/src/dawn/wire/client/ShaderModule.cpp
+++ b/src/dawn/wire/client/ShaderModule.cpp
@@ -120,7 +120,7 @@
 WGPUFuture ShaderModule::APIGetCompilationInfo(
     const WGPUCompilationInfoCallbackInfo& callbackInfo) {
     auto [futureIDInternal, tracked] =
-        GetEventManager().TrackEvent(std::make_unique<CompilationInfoEvent>(callbackInfo, this));
+        GetEventManager().TrackEvent(AcquireRef(new CompilationInfoEvent(callbackInfo, this)));
     if (!tracked) {
         return {futureIDInternal};
     }