Revert "Allow WaitAny to wait with multiple threads on the same handle/fd"

This reverts commit 1b0f28ec95a4f384ed758ddb01024fe3235454a6.

Reason for revert: https://crbug.com/dawn/2413


Original change's description:
> Allow WaitAny to wait with multiple threads on the same handle/fd
>
> Internally, separate events may refer to the same SystemEvent. Waiting
> on this event from multiple threads turns out to be OK. This allows
> futures to use a fewer total number of handles/fds.
>
> Add tests that this works correctly.
>
> Bug: dawn:1987
> Change-Id: I2bff252c120d75aadcc338a74d8e3ff158a826eb
> Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/173847
> Reviewed-by: Loko Kung <lokokung@google.com>
> Reviewed-by: Kai Ninomiya <kainino@chromium.org>
> Kokoro: Kokoro <noreply+kokoro@google.com>
> Commit-Queue: Austin Eng <enga@chromium.org>

TBR=kainino@chromium.org,enga@chromium.org,noreply+kokoro@google.com,dawn-scoped@luci-project-accounts.iam.gserviceaccount.com,lokokung@google.com

Bug: dawn:1987, dawn:2413
Change-Id: Ida0c5009840aa0756e3653dcb3a46e7c7cff3673
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/175002
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Auto-Submit: Austin Eng <enga@chromium.org>
Commit-Queue: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Reviewed-by: Loko Kung <lokokung@google.com>
Kokoro: Austin Eng <enga@chromium.org>
Kokoro: Kokoro <noreply+kokoro@google.com>
diff --git a/src/dawn/common/SerialMap.h b/src/dawn/common/SerialMap.h
index 6bea291..0ec4d96 100644
--- a/src/dawn/common/SerialMap.h
+++ b/src/dawn/common/SerialMap.h
@@ -61,7 +61,7 @@
     void Enqueue(const std::vector<Value>& values, Serial serial);
     void Enqueue(std::vector<Value>&& values, Serial serial);
 
-    Value* FindOne(Serial serial);
+    std::optional<Value> TakeOne(Serial serial);
 };
 
 // SerialMap
@@ -93,16 +93,21 @@
 }
 
 template <typename Serial, typename Value>
-Value* SerialMap<Serial, Value>::FindOne(Serial serial) {
+std::optional<Value> SerialMap<Serial, Value>::TakeOne(Serial serial) {
     auto it = this->mStorage.find(serial);
     if (it == this->mStorage.end()) {
-        return nullptr;
+        return std::nullopt;
     }
     auto& vec = it->second;
     if (vec.empty()) {
-        return nullptr;
+        return std::nullopt;
     }
-    return &vec.back();
+    Value value = std::move(vec.back());
+    vec.pop_back();
+    if (vec.empty()) {
+        this->mStorage.erase(it);
+    }
+    return value;
 }
 
 }  // namespace dawn
diff --git a/src/dawn/native/SystemEvent.h b/src/dawn/native/SystemEvent.h
index 73b7742..9e75f64 100644
--- a/src/dawn/native/SystemEvent.h
+++ b/src/dawn/native/SystemEvent.h
@@ -28,6 +28,7 @@
 #ifndef SRC_DAWN_NATIVE_SYSTEMEVENT_H_
 #define SRC_DAWN_NATIVE_SYSTEMEVENT_H_
 
+#include <optional>
 #include <utility>
 
 #include "dawn/common/MutexProtected.h"
@@ -100,12 +101,6 @@
 //   signal it by write()ing into the pipe (to make it become readable, though we won't read() it).
 std::pair<SystemEventPipeSender, SystemEventReceiver> CreateSystemEventPipe();
 
-struct SharedSystemEventReceiver : public RefCounted {
-    explicit SharedSystemEventReceiver(SystemEventReceiver&& rhs) : receiver(std::move(rhs)) {}
-
-    const SystemEventReceiver receiver;
-};
-
 class SystemEvent : public RefCounted {
   public:
     static Ref<SystemEvent> CreateSignaled();
diff --git a/src/dawn/native/d3d/QueueD3D.cpp b/src/dawn/native/d3d/QueueD3D.cpp
index 44e0f5b..f8615dc 100644
--- a/src/dawn/native/d3d/QueueD3D.cpp
+++ b/src/dawn/native/d3d/QueueD3D.cpp
@@ -41,43 +41,31 @@
         return true;
     }
 
-    auto sharedReceiver = GetOrCreateSystemEventReceiver(serial);
-
-    bool ready = false;
-    std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
-        {{sharedReceiver->receiver, &ready}}};
-    DAWN_ASSERT(serial <= GetLastSubmittedCommandSerial());
-    bool didComplete = WaitAnySystemEvent(events.begin(), events.end(), timeout);
-    if (didComplete) {
-        // Clear out everything up to this serial since they are all complete now.
-        mSystemEventReceivers->ClearUpTo(serial);
-    }
-    return didComplete;
-}
-
-Ref<SharedSystemEventReceiver> Queue::GetOrCreateSystemEventReceiver(
-    ExecutionSerial completionSerial) {
-    return mSystemEventReceivers.Use([&](auto receivers) {
-        if (auto* receiver = receivers->FindOne(completionSerial)) {
-            return *receiver;
-        }
-
-        if (completionSerial <= GetCompletedCommandSerial()) {
-            return AcquireRef(
-                new SharedSystemEventReceiver(SystemEventReceiver::CreateAlreadySignaled()));
-        }
+    auto receiver = mSystemEventReceivers->TakeOne(serial);
+    if (!receiver) {
+        // Anytime we may create an event, clear out any completed receivers so the list doesn't
+        // grow forever.
+        mSystemEventReceivers->ClearUpTo(completedSerial);
 
         HANDLE fenceEvent =
             ::CreateEvent(nullptr, /*bManualReset=*/true, /*bInitialState=*/false, nullptr);
-        DAWN_CHECK(fenceEvent != nullptr);
-        SetEventOnCompletion(completionSerial, fenceEvent);
+        DAWN_INVALID_IF(fenceEvent == nullptr, "CreateEvent failed");
+        SetEventOnCompletion(serial, fenceEvent);
 
-        // Make a boxed SystemEventReceiver and enqueue it into the list of receivers.
-        auto sharedReceiver = AcquireRef(
-            new SharedSystemEventReceiver(SystemEventReceiver(SystemHandle::Acquire(fenceEvent))));
-        receivers->Enqueue(sharedReceiver, completionSerial);
-        return sharedReceiver;
-    });
+        receiver = SystemEventReceiver(SystemHandle::Acquire(fenceEvent));
+    }
+
+    bool ready = false;
+    std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
+        {{*receiver, &ready}}};
+    DAWN_ASSERT(serial <= GetLastSubmittedCommandSerial());
+    bool didComplete = WaitAnySystemEvent(events.begin(), events.end(), timeout);
+    if (!didComplete) {
+        // Return the SystemEventReceiver to the pool of receivers so it can be re-waited in the
+        // future.
+        mSystemEventReceivers->Enqueue(std::move(*receiver), serial);
+    }
+    return didComplete;
 }
 
 }  // namespace dawn::native::d3d
diff --git a/src/dawn/native/d3d/QueueD3D.h b/src/dawn/native/d3d/QueueD3D.h
index 974a05d..090a91d 100644
--- a/src/dawn/native/d3d/QueueD3D.h
+++ b/src/dawn/native/d3d/QueueD3D.h
@@ -48,11 +48,9 @@
   private:
     virtual void SetEventOnCompletion(ExecutionSerial serial, HANDLE event) = 0;
 
-    Ref<SharedSystemEventReceiver> GetOrCreateSystemEventReceiver(ExecutionSerial completionSerial);
     ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) override;
 
-    MutexProtected<SerialMap<ExecutionSerial, Ref<SharedSystemEventReceiver>>>
-        mSystemEventReceivers;
+    MutexProtected<SerialMap<ExecutionSerial, SystemEventReceiver>> mSystemEventReceivers;
 };
 
 }  // namespace dawn::native::d3d
diff --git a/src/dawn/native/metal/QueueMTL.h b/src/dawn/native/metal/QueueMTL.h
index 92099e6..d3f03dd 100644
--- a/src/dawn/native/metal/QueueMTL.h
+++ b/src/dawn/native/metal/QueueMTL.h
@@ -29,7 +29,7 @@
 #define SRC_DAWN_NATIVE_METAL_QUEUEMTL_H_
 
 #import <Metal/Metal.h>
-#include <utility>
+#include <map>
 
 #include "dawn/common/MutexProtected.h"
 #include "dawn/common/SerialMap.h"
@@ -52,6 +52,7 @@
     void WaitForCommandsToBeScheduled();
     void ExportLastSignaledEvent(ExternalImageMTLSharedEventDescriptor* desc);
 
+    Ref<SystemEvent> CreateWorkDoneSystemEvent(ExecutionSerial serial);
     ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) override;
 
   private:
@@ -61,8 +62,6 @@
     MaybeError Initialize();
     void UpdateWaitingEvents(ExecutionSerial completedSerial);
 
-    Ref<SharedSystemEventReceiver> GetOrCreateSystemEventReceiver(ExecutionSerial serial);
-
     MaybeError SubmitImpl(uint32_t commandCount, CommandBufferBase* const* commands) override;
     bool HasPendingCommands() const override;
     MaybeError SubmitPendingCommands() override;
@@ -88,9 +87,7 @@
     // TODO(crbug.com/dawn/2065): If we atomically knew a conservative lower bound on the
     // mWaitingEvents serials, we could avoid taking this lock sometimes. Optimize if needed.
     // See old draft code: https://dawn-review.googlesource.com/c/dawn/+/137502/29
-    MutexProtected<SerialMap<ExecutionSerial,
-                             std::pair<SystemEventPipeSender, Ref<SharedSystemEventReceiver>>>>
-        mWaitingEvents;
+    MutexProtected<SerialMap<ExecutionSerial, Ref<SystemEvent>>> mWaitingEvents;
 
     // A shared event that can be exported for synchronization with other users of Metal.
     // MTLSharedEvent is not available until macOS 10.14+ so use just `id`.
diff --git a/src/dawn/native/metal/QueueMTL.mm b/src/dawn/native/metal/QueueMTL.mm
index b56015b..1f7901e 100644
--- a/src/dawn/native/metal/QueueMTL.mm
+++ b/src/dawn/native/metal/QueueMTL.mm
@@ -82,8 +82,8 @@
 
 void Queue::UpdateWaitingEvents(ExecutionSerial completedSerial) {
     mWaitingEvents.Use([&](auto events) {
-        for (auto& [sender, _] : events->IterateUpTo(completedSerial)) {
-            std::move(sender).Signal();
+        for (auto& s : events->IterateUpTo(completedSerial)) {
+            std::move(s)->Signal();
         }
         events->ClearUpTo(completedSerial);
     });
@@ -241,36 +241,31 @@
     }
 }
 
-Ref<SharedSystemEventReceiver> Queue::GetOrCreateSystemEventReceiver(ExecutionSerial serial) {
-    return mWaitingEvents.Use([&](auto events) {
-        if (auto* ev = events->FindOne(serial)) {
-            return ev->second;
-        }
-
+Ref<SystemEvent> Queue::CreateWorkDoneSystemEvent(ExecutionSerial serial) {
+    Ref<SystemEvent> completionEvent = AcquireRef(new SystemEvent());
+    mWaitingEvents.Use([&](auto events) {
+        SystemEventReceiver receiver;
         // Now that we hold the lock, check against mCompletedSerial before inserting.
         // This serial may have just completed. If it did, mark the event complete.
         // Also check for device loss. Otherwise, we could enqueue the event
         // after mWaitingEvents has been flushed for device loss, and it'll never get cleaned up.
         if (GetDevice()->IsLost() ||
             serial <= ExecutionSerial(mCompletedSerial.load(std::memory_order_acquire))) {
-            return AcquireRef(
-                new SharedSystemEventReceiver(SystemEventReceiver::CreateAlreadySignaled()));
+            completionEvent->Signal();
+        } else {
+            // Insert the event into the list which will be signaled inside Metal's queue
+            // completion handler.
+            events->Enqueue(completionEvent, serial);
         }
-
-        auto [sender, receiver] = CreateSystemEventPipe();
-        Ref<SharedSystemEventReceiver> sharedReceiver =
-            AcquireRef(new SharedSystemEventReceiver(std::move(receiver)));
-        events->Enqueue({std::move(sender), sharedReceiver}, serial);
-
-        return sharedReceiver;
     });
+    return completionEvent;
 }
 
 ResultOrError<bool> Queue::WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) {
-    Ref<SharedSystemEventReceiver> sharedReceiver = GetOrCreateSystemEventReceiver(serial);
+    Ref<SystemEvent> event = CreateWorkDoneSystemEvent(serial);
     bool ready = false;
     std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
-        {{sharedReceiver->receiver, &ready}}};
+        {{event->GetOrCreateSystemEventReceiver(), &ready}}};
     return WaitAnySystemEvent(events.begin(), events.end(), timeout);
 }
 
diff --git a/src/dawn/native/vulkan/QueueVk.cpp b/src/dawn/native/vulkan/QueueVk.cpp
index 67f56d0..9e4b0cf 100644
--- a/src/dawn/native/vulkan/QueueVk.cpp
+++ b/src/dawn/native/vulkan/QueueVk.cpp
@@ -540,6 +540,7 @@
         if (waitFence == VK_NULL_HANDLE) {
             // Fence not found. This serial must have already completed.
             // Return a VK_SUCCESS status.
+            DAWN_ASSERT(serial <= GetCompletedCommandSerial());
             return VkResult::WrapUnsafe(VK_SUCCESS);
         }
         // Wait for the fence.
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp
index d911c09..10de0c1 100644
--- a/src/dawn/tests/end2end/EventTests.cpp
+++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -622,117 +622,6 @@
     }
 }
 
-// Test waiting on futures with many threads when all the futures refer to the
-// same point in time. Some threads will poll, some will wait. The main thread
-// will do ProcessEvents.
-TEST_P(WaitAnyTests, ProcessEventsWhileManyThreadsWaitAnySameFutureSerial) {
-    // Timed wait any is not available on the wire.
-    DAWN_TEST_UNSUPPORTED_IF(UsesWire());
-    // TODO(dawn:2397): GL backend needs to use EGL syncs to allow cross-thread waiting.
-    DAWN_TEST_UNSUPPORTED_IF(IsOpenGL() || IsOpenGLES());
-
-    std::mutex mutex;
-    std::condition_variable cv;
-    bool run = false;
-
-    constexpr size_t kCount = 50;
-    std::vector<wgpu::Future> waitFutures(kCount);
-    std::vector<wgpu::Future> pollFutures(kCount);
-
-    // Create threads. They will each wait on a future after they are unblocked
-    // by the condition variable.
-    std::vector<std::thread> threads;
-    for (uint32_t i = 0; i < kCount; ++i) {
-        // Create threads that will wait on the future with an infinite timeout.
-        threads.emplace_back(
-            [&](uint32_t i) {
-                // Wait for the `run` flag to be set.
-                {
-                    std::unique_lock<std::mutex> lg(mutex);
-                    cv.wait(lg, [&]() { return run; });
-                }
-
-                wgpu::FutureWaitInfo waitInfo{waitFutures[i], false};
-                wgpu::WaitStatus status = instance.WaitAny(1, &waitInfo, UINT64_MAX);
-                EXPECT_EQ(status, wgpu::WaitStatus::Success);
-                EXPECT_TRUE(waitInfo.completed);
-            },
-            i);
-
-        // Create threads that will poll the future with no timeout.
-        threads.emplace_back(
-            [&](uint32_t i) {
-                // Wait for the `run` flag to be set.
-                {
-                    std::unique_lock<std::mutex> lg(mutex);
-                    cv.wait(lg, [&]() { return run; });
-                }
-
-                wgpu::FutureWaitInfo waitInfo{pollFutures[i], false};
-                wgpu::WaitStatus status;
-                do {
-                    status = instance.WaitAny(1, &waitInfo, 0);
-                } while (status == wgpu::WaitStatus::TimedOut);
-                EXPECT_EQ(status, wgpu::WaitStatus::Success);
-                EXPECT_TRUE(waitInfo.completed);
-            },
-            i);
-    }
-
-    wgpu::BufferDescriptor bufferDesc = {};
-    bufferDesc.size = 4 * 1024 * 1024;
-    bufferDesc.usage = wgpu::BufferUsage::CopySrc | wgpu::BufferUsage::CopyDst;
-
-    // Encode a buffer to buffer copy so we have some work to wait on.
-    wgpu::Buffer b1 = device.CreateBuffer(&bufferDesc);
-    wgpu::Buffer b2 = device.CreateBuffer(&bufferDesc);
-    wgpu::CommandEncoder encoder = device.CreateCommandEncoder();
-    encoder.CopyBufferToBuffer(b1, 0, b2, 0, bufferDesc.size);
-    wgpu::CommandBuffer cb = encoder.Finish();
-
-    // Set the `run` flag and wake all threads.
-    {
-        std::unique_lock<std::mutex> lg(mutex);
-
-        // Submit a command buffer and set the submitted work done futures.
-        queue.Submit(1, &cb);
-        for (uint32_t i = 0; i < kCount; ++i) {
-            waitFutures[i] = queue.OnSubmittedWorkDone({
-                nullptr,
-                wgpu::CallbackMode::WaitAnyOnly,
-                [](WGPUQueueWorkDoneStatus status, void* userdata) {},
-                nullptr,
-            });
-            pollFutures[i] = queue.OnSubmittedWorkDone({
-                nullptr,
-                wgpu::CallbackMode::WaitAnyOnly,
-                [](WGPUQueueWorkDoneStatus status, void* userdata) {},
-                nullptr,
-            });
-        }
-        run = true;
-        cv.notify_all();
-    }
-
-    bool done = false;
-    queue.OnSubmittedWorkDone({
-        nullptr,
-        wgpu::CallbackMode::AllowProcessEvents,
-        [](WGPUQueueWorkDoneStatus status, void* userdata) {
-            *static_cast<bool*>(userdata) = true;
-        },
-        &done,
-    });
-
-    while (!done) {
-        instance.ProcessEvents();
-    }
-
-    for (auto& t : threads) {
-        t.join();
-    }
-}
-
 DAWN_INSTANTIATE_TEST(WaitAnyTests,
                       D3D11Backend(),
                       D3D12Backend(),