Revert "Allow WaitAny to wait with multiple threads on the same handle/fd"
This reverts commit 1b0f28ec95a4f384ed758ddb01024fe3235454a6.
Reason for revert: https://crbug.com/dawn/2413
Original change's description:
> Allow WaitAny to wait with multiple threads on the same handle/fd
>
> Internally, separate events may refer to the same SystemEvent. Waiting
> on this event from multiple threads turns out to be OK. This allows
> futures to use a fewer total number of handles/fds.
>
> Add tests that this works correctly.
>
> Bug: dawn:1987
> Change-Id: I2bff252c120d75aadcc338a74d8e3ff158a826eb
> Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/173847
> Reviewed-by: Loko Kung <lokokung@google.com>
> Reviewed-by: Kai Ninomiya <kainino@chromium.org>
> Kokoro: Kokoro <noreply+kokoro@google.com>
> Commit-Queue: Austin Eng <enga@chromium.org>
TBR=kainino@chromium.org,enga@chromium.org,noreply+kokoro@google.com,dawn-scoped@luci-project-accounts.iam.gserviceaccount.com,lokokung@google.com
Bug: dawn:1987, dawn:2413
Change-Id: Ida0c5009840aa0756e3653dcb3a46e7c7cff3673
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/175002
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Auto-Submit: Austin Eng <enga@chromium.org>
Commit-Queue: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Reviewed-by: Loko Kung <lokokung@google.com>
Kokoro: Austin Eng <enga@chromium.org>
Kokoro: Kokoro <noreply+kokoro@google.com>
diff --git a/src/dawn/common/SerialMap.h b/src/dawn/common/SerialMap.h
index 6bea291..0ec4d96 100644
--- a/src/dawn/common/SerialMap.h
+++ b/src/dawn/common/SerialMap.h
@@ -61,7 +61,7 @@
void Enqueue(const std::vector<Value>& values, Serial serial);
void Enqueue(std::vector<Value>&& values, Serial serial);
- Value* FindOne(Serial serial);
+ std::optional<Value> TakeOne(Serial serial);
};
// SerialMap
@@ -93,16 +93,21 @@
}
template <typename Serial, typename Value>
-Value* SerialMap<Serial, Value>::FindOne(Serial serial) {
+std::optional<Value> SerialMap<Serial, Value>::TakeOne(Serial serial) {
auto it = this->mStorage.find(serial);
if (it == this->mStorage.end()) {
- return nullptr;
+ return std::nullopt;
}
auto& vec = it->second;
if (vec.empty()) {
- return nullptr;
+ return std::nullopt;
}
- return &vec.back();
+ Value value = std::move(vec.back());
+ vec.pop_back();
+ if (vec.empty()) {
+ this->mStorage.erase(it);
+ }
+ return value;
}
} // namespace dawn
diff --git a/src/dawn/native/SystemEvent.h b/src/dawn/native/SystemEvent.h
index 73b7742..9e75f64 100644
--- a/src/dawn/native/SystemEvent.h
+++ b/src/dawn/native/SystemEvent.h
@@ -28,6 +28,7 @@
#ifndef SRC_DAWN_NATIVE_SYSTEMEVENT_H_
#define SRC_DAWN_NATIVE_SYSTEMEVENT_H_
+#include <optional>
#include <utility>
#include "dawn/common/MutexProtected.h"
@@ -100,12 +101,6 @@
// signal it by write()ing into the pipe (to make it become readable, though we won't read() it).
std::pair<SystemEventPipeSender, SystemEventReceiver> CreateSystemEventPipe();
-struct SharedSystemEventReceiver : public RefCounted {
- explicit SharedSystemEventReceiver(SystemEventReceiver&& rhs) : receiver(std::move(rhs)) {}
-
- const SystemEventReceiver receiver;
-};
-
class SystemEvent : public RefCounted {
public:
static Ref<SystemEvent> CreateSignaled();
diff --git a/src/dawn/native/d3d/QueueD3D.cpp b/src/dawn/native/d3d/QueueD3D.cpp
index 44e0f5b..f8615dc 100644
--- a/src/dawn/native/d3d/QueueD3D.cpp
+++ b/src/dawn/native/d3d/QueueD3D.cpp
@@ -41,43 +41,31 @@
return true;
}
- auto sharedReceiver = GetOrCreateSystemEventReceiver(serial);
-
- bool ready = false;
- std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
- {{sharedReceiver->receiver, &ready}}};
- DAWN_ASSERT(serial <= GetLastSubmittedCommandSerial());
- bool didComplete = WaitAnySystemEvent(events.begin(), events.end(), timeout);
- if (didComplete) {
- // Clear out everything up to this serial since they are all complete now.
- mSystemEventReceivers->ClearUpTo(serial);
- }
- return didComplete;
-}
-
-Ref<SharedSystemEventReceiver> Queue::GetOrCreateSystemEventReceiver(
- ExecutionSerial completionSerial) {
- return mSystemEventReceivers.Use([&](auto receivers) {
- if (auto* receiver = receivers->FindOne(completionSerial)) {
- return *receiver;
- }
-
- if (completionSerial <= GetCompletedCommandSerial()) {
- return AcquireRef(
- new SharedSystemEventReceiver(SystemEventReceiver::CreateAlreadySignaled()));
- }
+ auto receiver = mSystemEventReceivers->TakeOne(serial);
+ if (!receiver) {
+ // Anytime we may create an event, clear out any completed receivers so the list doesn't
+ // grow forever.
+ mSystemEventReceivers->ClearUpTo(completedSerial);
HANDLE fenceEvent =
::CreateEvent(nullptr, /*bManualReset=*/true, /*bInitialState=*/false, nullptr);
- DAWN_CHECK(fenceEvent != nullptr);
- SetEventOnCompletion(completionSerial, fenceEvent);
+ DAWN_INVALID_IF(fenceEvent == nullptr, "CreateEvent failed");
+ SetEventOnCompletion(serial, fenceEvent);
- // Make a boxed SystemEventReceiver and enqueue it into the list of receivers.
- auto sharedReceiver = AcquireRef(
- new SharedSystemEventReceiver(SystemEventReceiver(SystemHandle::Acquire(fenceEvent))));
- receivers->Enqueue(sharedReceiver, completionSerial);
- return sharedReceiver;
- });
+ receiver = SystemEventReceiver(SystemHandle::Acquire(fenceEvent));
+ }
+
+ bool ready = false;
+ std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
+ {{*receiver, &ready}}};
+ DAWN_ASSERT(serial <= GetLastSubmittedCommandSerial());
+ bool didComplete = WaitAnySystemEvent(events.begin(), events.end(), timeout);
+ if (!didComplete) {
+ // Return the SystemEventReceiver to the pool of receivers so it can be re-waited in the
+ // future.
+ mSystemEventReceivers->Enqueue(std::move(*receiver), serial);
+ }
+ return didComplete;
}
} // namespace dawn::native::d3d
diff --git a/src/dawn/native/d3d/QueueD3D.h b/src/dawn/native/d3d/QueueD3D.h
index 974a05d..090a91d 100644
--- a/src/dawn/native/d3d/QueueD3D.h
+++ b/src/dawn/native/d3d/QueueD3D.h
@@ -48,11 +48,9 @@
private:
virtual void SetEventOnCompletion(ExecutionSerial serial, HANDLE event) = 0;
- Ref<SharedSystemEventReceiver> GetOrCreateSystemEventReceiver(ExecutionSerial completionSerial);
ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) override;
- MutexProtected<SerialMap<ExecutionSerial, Ref<SharedSystemEventReceiver>>>
- mSystemEventReceivers;
+ MutexProtected<SerialMap<ExecutionSerial, SystemEventReceiver>> mSystemEventReceivers;
};
} // namespace dawn::native::d3d
diff --git a/src/dawn/native/metal/QueueMTL.h b/src/dawn/native/metal/QueueMTL.h
index 92099e6..d3f03dd 100644
--- a/src/dawn/native/metal/QueueMTL.h
+++ b/src/dawn/native/metal/QueueMTL.h
@@ -29,7 +29,7 @@
#define SRC_DAWN_NATIVE_METAL_QUEUEMTL_H_
#import <Metal/Metal.h>
-#include <utility>
+#include <map>
#include "dawn/common/MutexProtected.h"
#include "dawn/common/SerialMap.h"
@@ -52,6 +52,7 @@
void WaitForCommandsToBeScheduled();
void ExportLastSignaledEvent(ExternalImageMTLSharedEventDescriptor* desc);
+ Ref<SystemEvent> CreateWorkDoneSystemEvent(ExecutionSerial serial);
ResultOrError<bool> WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) override;
private:
@@ -61,8 +62,6 @@
MaybeError Initialize();
void UpdateWaitingEvents(ExecutionSerial completedSerial);
- Ref<SharedSystemEventReceiver> GetOrCreateSystemEventReceiver(ExecutionSerial serial);
-
MaybeError SubmitImpl(uint32_t commandCount, CommandBufferBase* const* commands) override;
bool HasPendingCommands() const override;
MaybeError SubmitPendingCommands() override;
@@ -88,9 +87,7 @@
// TODO(crbug.com/dawn/2065): If we atomically knew a conservative lower bound on the
// mWaitingEvents serials, we could avoid taking this lock sometimes. Optimize if needed.
// See old draft code: https://dawn-review.googlesource.com/c/dawn/+/137502/29
- MutexProtected<SerialMap<ExecutionSerial,
- std::pair<SystemEventPipeSender, Ref<SharedSystemEventReceiver>>>>
- mWaitingEvents;
+ MutexProtected<SerialMap<ExecutionSerial, Ref<SystemEvent>>> mWaitingEvents;
// A shared event that can be exported for synchronization with other users of Metal.
// MTLSharedEvent is not available until macOS 10.14+ so use just `id`.
diff --git a/src/dawn/native/metal/QueueMTL.mm b/src/dawn/native/metal/QueueMTL.mm
index b56015b..1f7901e 100644
--- a/src/dawn/native/metal/QueueMTL.mm
+++ b/src/dawn/native/metal/QueueMTL.mm
@@ -82,8 +82,8 @@
void Queue::UpdateWaitingEvents(ExecutionSerial completedSerial) {
mWaitingEvents.Use([&](auto events) {
- for (auto& [sender, _] : events->IterateUpTo(completedSerial)) {
- std::move(sender).Signal();
+ for (auto& s : events->IterateUpTo(completedSerial)) {
+ std::move(s)->Signal();
}
events->ClearUpTo(completedSerial);
});
@@ -241,36 +241,31 @@
}
}
-Ref<SharedSystemEventReceiver> Queue::GetOrCreateSystemEventReceiver(ExecutionSerial serial) {
- return mWaitingEvents.Use([&](auto events) {
- if (auto* ev = events->FindOne(serial)) {
- return ev->second;
- }
-
+Ref<SystemEvent> Queue::CreateWorkDoneSystemEvent(ExecutionSerial serial) {
+ Ref<SystemEvent> completionEvent = AcquireRef(new SystemEvent());
+ mWaitingEvents.Use([&](auto events) {
+ SystemEventReceiver receiver;
// Now that we hold the lock, check against mCompletedSerial before inserting.
// This serial may have just completed. If it did, mark the event complete.
// Also check for device loss. Otherwise, we could enqueue the event
// after mWaitingEvents has been flushed for device loss, and it'll never get cleaned up.
if (GetDevice()->IsLost() ||
serial <= ExecutionSerial(mCompletedSerial.load(std::memory_order_acquire))) {
- return AcquireRef(
- new SharedSystemEventReceiver(SystemEventReceiver::CreateAlreadySignaled()));
+ completionEvent->Signal();
+ } else {
+ // Insert the event into the list which will be signaled inside Metal's queue
+ // completion handler.
+ events->Enqueue(completionEvent, serial);
}
-
- auto [sender, receiver] = CreateSystemEventPipe();
- Ref<SharedSystemEventReceiver> sharedReceiver =
- AcquireRef(new SharedSystemEventReceiver(std::move(receiver)));
- events->Enqueue({std::move(sender), sharedReceiver}, serial);
-
- return sharedReceiver;
});
+ return completionEvent;
}
ResultOrError<bool> Queue::WaitForQueueSerial(ExecutionSerial serial, Nanoseconds timeout) {
- Ref<SharedSystemEventReceiver> sharedReceiver = GetOrCreateSystemEventReceiver(serial);
+ Ref<SystemEvent> event = CreateWorkDoneSystemEvent(serial);
bool ready = false;
std::array<std::pair<const dawn::native::SystemEventReceiver&, bool*>, 1> events{
- {{sharedReceiver->receiver, &ready}}};
+ {{event->GetOrCreateSystemEventReceiver(), &ready}}};
return WaitAnySystemEvent(events.begin(), events.end(), timeout);
}
diff --git a/src/dawn/native/vulkan/QueueVk.cpp b/src/dawn/native/vulkan/QueueVk.cpp
index 67f56d0..9e4b0cf 100644
--- a/src/dawn/native/vulkan/QueueVk.cpp
+++ b/src/dawn/native/vulkan/QueueVk.cpp
@@ -540,6 +540,7 @@
if (waitFence == VK_NULL_HANDLE) {
// Fence not found. This serial must have already completed.
// Return a VK_SUCCESS status.
+ DAWN_ASSERT(serial <= GetCompletedCommandSerial());
return VkResult::WrapUnsafe(VK_SUCCESS);
}
// Wait for the fence.
diff --git a/src/dawn/tests/end2end/EventTests.cpp b/src/dawn/tests/end2end/EventTests.cpp
index d911c09..10de0c1 100644
--- a/src/dawn/tests/end2end/EventTests.cpp
+++ b/src/dawn/tests/end2end/EventTests.cpp
@@ -622,117 +622,6 @@
}
}
-// Test waiting on futures with many threads when all the futures refer to the
-// same point in time. Some threads will poll, some will wait. The main thread
-// will do ProcessEvents.
-TEST_P(WaitAnyTests, ProcessEventsWhileManyThreadsWaitAnySameFutureSerial) {
- // Timed wait any is not available on the wire.
- DAWN_TEST_UNSUPPORTED_IF(UsesWire());
- // TODO(dawn:2397): GL backend needs to use EGL syncs to allow cross-thread waiting.
- DAWN_TEST_UNSUPPORTED_IF(IsOpenGL() || IsOpenGLES());
-
- std::mutex mutex;
- std::condition_variable cv;
- bool run = false;
-
- constexpr size_t kCount = 50;
- std::vector<wgpu::Future> waitFutures(kCount);
- std::vector<wgpu::Future> pollFutures(kCount);
-
- // Create threads. They will each wait on a future after they are unblocked
- // by the condition variable.
- std::vector<std::thread> threads;
- for (uint32_t i = 0; i < kCount; ++i) {
- // Create threads that will wait on the future with an infinite timeout.
- threads.emplace_back(
- [&](uint32_t i) {
- // Wait for the `run` flag to be set.
- {
- std::unique_lock<std::mutex> lg(mutex);
- cv.wait(lg, [&]() { return run; });
- }
-
- wgpu::FutureWaitInfo waitInfo{waitFutures[i], false};
- wgpu::WaitStatus status = instance.WaitAny(1, &waitInfo, UINT64_MAX);
- EXPECT_EQ(status, wgpu::WaitStatus::Success);
- EXPECT_TRUE(waitInfo.completed);
- },
- i);
-
- // Create threads that will poll the future with no timeout.
- threads.emplace_back(
- [&](uint32_t i) {
- // Wait for the `run` flag to be set.
- {
- std::unique_lock<std::mutex> lg(mutex);
- cv.wait(lg, [&]() { return run; });
- }
-
- wgpu::FutureWaitInfo waitInfo{pollFutures[i], false};
- wgpu::WaitStatus status;
- do {
- status = instance.WaitAny(1, &waitInfo, 0);
- } while (status == wgpu::WaitStatus::TimedOut);
- EXPECT_EQ(status, wgpu::WaitStatus::Success);
- EXPECT_TRUE(waitInfo.completed);
- },
- i);
- }
-
- wgpu::BufferDescriptor bufferDesc = {};
- bufferDesc.size = 4 * 1024 * 1024;
- bufferDesc.usage = wgpu::BufferUsage::CopySrc | wgpu::BufferUsage::CopyDst;
-
- // Encode a buffer to buffer copy so we have some work to wait on.
- wgpu::Buffer b1 = device.CreateBuffer(&bufferDesc);
- wgpu::Buffer b2 = device.CreateBuffer(&bufferDesc);
- wgpu::CommandEncoder encoder = device.CreateCommandEncoder();
- encoder.CopyBufferToBuffer(b1, 0, b2, 0, bufferDesc.size);
- wgpu::CommandBuffer cb = encoder.Finish();
-
- // Set the `run` flag and wake all threads.
- {
- std::unique_lock<std::mutex> lg(mutex);
-
- // Submit a command buffer and set the submitted work done futures.
- queue.Submit(1, &cb);
- for (uint32_t i = 0; i < kCount; ++i) {
- waitFutures[i] = queue.OnSubmittedWorkDone({
- nullptr,
- wgpu::CallbackMode::WaitAnyOnly,
- [](WGPUQueueWorkDoneStatus status, void* userdata) {},
- nullptr,
- });
- pollFutures[i] = queue.OnSubmittedWorkDone({
- nullptr,
- wgpu::CallbackMode::WaitAnyOnly,
- [](WGPUQueueWorkDoneStatus status, void* userdata) {},
- nullptr,
- });
- }
- run = true;
- cv.notify_all();
- }
-
- bool done = false;
- queue.OnSubmittedWorkDone({
- nullptr,
- wgpu::CallbackMode::AllowProcessEvents,
- [](WGPUQueueWorkDoneStatus status, void* userdata) {
- *static_cast<bool*>(userdata) = true;
- },
- &done,
- });
-
- while (!done) {
- instance.ProcessEvents();
- }
-
- for (auto& t : threads) {
- t.join();
- }
-}
-
DAWN_INSTANTIATE_TEST(WaitAnyTests,
D3D11Backend(),
D3D12Backend(),