[WGPUFuture] Implements mapAsync and use futures in wire.
- Extend TrackedEvents for different events so that we can properly
pass around data that needs to be returned.
- We implement mapAsync and use futures in the wire for WorkDone in the
same change because those two need to be on the same code path to
ensure ordering.
- Removes a test that is no longer relevant since `signalValue` is to be
deprecated, and was failing since the new path does not provide it.
- Note for the wire we just use Spontaneous for the old path for now
because that's effectively what it was doing before.
Bug: dawn:2052, dawn:2060
Change-Id: I2ab2d8e65f09d2685625b8bfa6a633c907eab49d
Reviewed-on: https://dawn-review.googlesource.com/c/dawn/+/152621
Commit-Queue: Loko Kung <lokokung@google.com>
Kokoro: Kokoro <noreply+kokoro@google.com>
Reviewed-by: Corentin Wallez <cwallez@chromium.org>
diff --git a/dawn.json b/dawn.json
index b460237..fb73083 100644
--- a/dawn.json
+++ b/dawn.json
@@ -546,6 +546,15 @@
{"name": "userdata", "type": "void *"}
]
},
+ "buffer map callback info": {
+ "category": "structure",
+ "extensible": "in",
+ "members": [
+ {"name": "mode", "type": "callback mode"},
+ {"name": "callback", "type": "buffer map callback"},
+ {"name": "userdata", "type": "void *"}
+ ]
+ },
"buffer map async status": {
"category": "enum",
"emscripten_no_enum_table": true,
diff --git a/dawn_wire.json b/dawn_wire.json
index 9137f2d..3f334d6 100644
--- a/dawn_wire.json
+++ b/dawn_wire.json
@@ -20,7 +20,7 @@
"commands": {
"buffer map async": [
{ "name": "buffer id", "type": "ObjectId", "id_type": "buffer" },
- { "name": "request serial", "type": "uint64_t" },
+ { "name": "future", "type": "future" },
{ "name": "mode", "type": "map mode" },
{ "name": "offset", "type": "uint64_t"},
{ "name": "size", "type": "uint64_t"}
@@ -64,7 +64,7 @@
"queue on submitted work done": [
{ "name": "queue id", "type": "ObjectId", "id_type": "queue" },
{ "name": "signal value", "type": "uint64_t" },
- { "name": "request serial", "type": "uint64_t" }
+ { "name": "future", "type": "future" }
],
"queue write buffer": [
{"name": "queue id", "type": "ObjectId", "id_type": "queue" },
@@ -101,7 +101,7 @@
"return commands": {
"buffer map async callback": [
{ "name": "buffer", "type": "ObjectHandle", "handle_type": "buffer" },
- { "name": "request serial", "type": "uint64_t" },
+ { "name": "future", "type": "future" },
{ "name": "status", "type": "uint32_t" },
{ "name": "read data update info length", "type": "uint64_t" },
{ "name": "read data update info", "type": "uint8_t", "annotation": "const*", "length": "read data update info length", "skip_serialize": true }
@@ -141,7 +141,7 @@
],
"queue work done callback": [
{ "name": "queue", "type": "ObjectHandle", "handle_type": "queue" },
- { "name": "request serial", "type": "uint64_t" },
+ { "name": "future", "type": "future" },
{ "name": "status", "type": "queue work done status" }
],
"shader module get compilation info callback": [
@@ -188,6 +188,7 @@
"AdapterEnumerateFeatures",
"AdapterRequestDevice",
"BufferMapAsync",
+ "BufferMapAsyncF",
"BufferGetConstMappedRange",
"BufferGetMappedRange",
"BufferGetMapState",
diff --git a/src/dawn/tests/end2end/DestroyTests.cpp b/src/dawn/tests/end2end/DestroyTests.cpp
index 962e6a8..e61ac36 100644
--- a/src/dawn/tests/end2end/DestroyTests.cpp
+++ b/src/dawn/tests/end2end/DestroyTests.cpp
@@ -210,9 +210,17 @@
ASSERT_DEVICE_ERROR(queue.OnSubmittedWorkDone(
0u,
[](WGPUQueueWorkDoneStatus status, void* userdata) {
- EXPECT_EQ(status, WGPUQueueWorkDoneStatus_DeviceLost);
+ // TODO(crbug.com/dawn/2021): Wire and native differ slightly for now. Unify once we
+ // decide on the correct result. In theory maybe we want to pretend that things succeed
+ // when the device is lost.
+ DestroyTest* test = static_cast<DestroyTest*>(userdata);
+ if (test->UsesWire()) {
+ EXPECT_EQ(status, WGPUQueueWorkDoneStatus_Success);
+ } else {
+ EXPECT_EQ(status, WGPUQueueWorkDoneStatus_DeviceLost);
+ }
},
- nullptr));
+ this));
}
DAWN_INSTANTIATE_TEST(DestroyTest,
diff --git a/src/dawn/tests/unittests/validation/QueueOnSubmittedWorkDoneValidationTests.cpp b/src/dawn/tests/unittests/validation/QueueOnSubmittedWorkDoneValidationTests.cpp
index 0cbf993..55263f2 100644
--- a/src/dawn/tests/unittests/validation/QueueOnSubmittedWorkDoneValidationTests.cpp
+++ b/src/dawn/tests/unittests/validation/QueueOnSubmittedWorkDoneValidationTests.cpp
@@ -47,11 +47,3 @@
WaitForAllOperations(device);
}
-
-// Test that OnSubmittedWorkDone is an error if signalValue isn't 0.
-TEST_F(QueueOnSubmittedWorkDoneValidationTests, SignaledValueNotZeroIsInvalid) {
- EXPECT_CALL(*mockQueueWorkDoneCallback, Call(WGPUQueueWorkDoneStatus_Error, this)).Times(1);
- ASSERT_DEVICE_ERROR(device.GetQueue().OnSubmittedWorkDone(1u, ToMockQueueWorkDone, this));
-
- WaitForAllOperations(device);
-}
diff --git a/src/dawn/wire/client/Buffer.cpp b/src/dawn/wire/client/Buffer.cpp
index 93ccba3..0713ce6 100644
--- a/src/dawn/wire/client/Buffer.cpp
+++ b/src/dawn/wire/client/Buffer.cpp
@@ -14,6 +14,7 @@
#include "dawn/wire/client/Buffer.h"
+#include <functional>
#include <limits>
#include <utility>
@@ -23,8 +24,8 @@
#include "dawn/wire/client/Device.h"
namespace dawn::wire::client {
-
namespace {
+
WGPUBuffer CreateErrorBufferOOMAtClient(Device* device, const WGPUBufferDescriptor* descriptor) {
if (descriptor->mappedAtCreation) {
return nullptr;
@@ -36,6 +37,32 @@
errorBufferDescriptor.nextInChain = &errorInfo.chain;
return GetProcs().deviceCreateErrorBuffer(ToAPI(device), &errorBufferDescriptor);
}
+
+struct MapAsyncEvent : public TrackedEvent {
+ explicit MapAsyncEvent(const WGPUBufferMapCallbackInfo& callbackInfo)
+ : TrackedEvent(callbackInfo.mode, callbackInfo.userdata),
+ mCallback(callbackInfo.callback) {}
+
+ void CompleteImpl(EventCompletionType completionType) override {
+ WGPUBufferMapAsyncStatus status = completionType == EventCompletionType::Shutdown
+ ? WGPUBufferMapAsyncStatus_DeviceLost
+ : WGPUBufferMapAsyncStatus_Success;
+ if (mStatus) {
+ status = *mStatus;
+ }
+ if (mCallback) {
+ mCallback(status, mUserdata);
+ }
+ }
+
+ static void MapAsyncEventReady(TrackedEvent& event, WGPUBufferMapAsyncStatus status) {
+ static_cast<MapAsyncEvent&>(event).mStatus = status;
+ }
+
+ WGPUBufferMapCallback mCallback;
+ std::optional<WGPUBufferMapAsyncStatus> mStatus;
+};
+
} // anonymous namespace
// static
@@ -145,19 +172,17 @@
InvokeAndClearCallback(WGPUBufferMapAsyncStatus_DestroyedBeforeCallback);
}
-void Buffer::CancelCallbacksForDisconnect() {
- InvokeAndClearCallback(WGPUBufferMapAsyncStatus_DeviceLost);
-}
-
void Buffer::InvokeAndClearCallback(WGPUBufferMapAsyncStatus status) {
- WGPUBufferMapCallback callback = mRequest.callback;
- void* userdata = mRequest.userdata;
- mRequest.callback = nullptr;
- mRequest.userdata = nullptr;
- mPendingMap = false;
- if (callback != nullptr) {
- callback(status, userdata);
+ if (!mPendingMapRequest) {
+ // Since this is unconditionally called on destruction, we might not have a pending map
+ // request all the time.
+ return;
}
+
+ FutureID futureID = mPendingMapRequest->futureID;
+ mPendingMapRequest.reset();
+ GetClient()->GetEventManager()->SetFutureReady(
+ futureID, std::bind(MapAsyncEvent::MapAsyncEventReady, std::placeholders::_1, status));
}
void Buffer::MapAsync(WGPUMapModeFlags mode,
@@ -165,15 +190,31 @@
size_t size,
WGPUBufferMapCallback callback,
void* userdata) {
+ WGPUBufferMapCallbackInfo callbackInfo = {};
+ callbackInfo.mode = WGPUCallbackMode_AllowSpontaneous;
+ callbackInfo.callback = callback;
+ callbackInfo.userdata = userdata;
+ MapAsyncF(mode, offset, size, callbackInfo);
+}
+
+WGPUFuture Buffer::MapAsyncF(WGPUMapModeFlags mode,
+ size_t offset,
+ size_t size,
+ const WGPUBufferMapCallbackInfo& callbackInfo) {
DAWN_ASSERT(GetRefcount() != 0);
- if (mPendingMap) {
- return callback(WGPUBufferMapAsyncStatus_MappingAlreadyPending, userdata);
+ Client* client = GetClient();
+ auto [futureIDInternal, tracked] =
+ client->GetEventManager()->TrackEvent(new MapAsyncEvent(callbackInfo));
+ if (!tracked) {
+ return {futureIDInternal};
}
- Client* client = GetClient();
- if (client->IsDisconnected()) {
- return callback(WGPUBufferMapAsyncStatus_DeviceLost, userdata);
+ if (mPendingMapRequest) {
+ client->GetEventManager()->SetFutureReady(
+ futureIDInternal, std::bind(MapAsyncEvent::MapAsyncEventReady, std::placeholders::_1,
+ WGPUBufferMapAsyncStatus_MappingAlreadyPending));
+ return {futureIDInternal};
}
// Handle the defaulting of size required by WebGPU.
@@ -181,45 +222,34 @@
size = mSize - offset;
}
- // Set up the request structure that will hold information while this mapping is
- // in flight.
- mRequest.callback = callback;
- mRequest.userdata = userdata;
- mRequest.offset = offset;
- mRequest.size = size;
+ // Set up the request structure that will hold information while this mapping is in flight.
+ MapRequestType mapMode = MapRequestType::None;
if (mode & WGPUMapMode_Read) {
- mRequest.type = MapRequestType::Read;
+ mapMode = MapRequestType::Read;
} else if (mode & WGPUMapMode_Write) {
- mRequest.type = MapRequestType::Write;
+ mapMode = MapRequestType::Write;
}
+ mPendingMapRequest = {futureIDInternal, offset, size, mapMode};
// Serialize the command to send to the server.
- mPendingMap = true;
- mSerial++;
BufferMapAsyncCmd cmd;
cmd.bufferId = GetWireId();
- cmd.requestSerial = mSerial;
+ cmd.future = {futureIDInternal};
cmd.mode = mode;
cmd.offset = offset;
cmd.size = size;
client->SerializeCommand(cmd);
+ return {futureIDInternal};
}
-bool Buffer::OnMapAsyncCallback(uint64_t requestSerial,
+bool Buffer::OnMapAsyncCallback(WGPUFuture future,
uint32_t status,
uint64_t readDataUpdateInfoLength,
const uint8_t* readDataUpdateInfo) {
- // If requestSerial doesn't match mSerial the corresponding request must have
- // already been rejected by unmap or destroy and another MapAsync request must
- // have been issued.
- if (mSerial != requestSerial) {
- return true;
- }
-
- // If mPendingMap is false the request must have been already rejected
- // by unmap or destroy.
- if (!mPendingMap) {
+ // Check that the response doesn't correspond to a request that has already been rejected by
+ // unmap or destroy.
+ if (!mPendingMapRequest || mPendingMapRequest->futureID != future.id) {
return true;
}
@@ -229,7 +259,7 @@
};
if (status == WGPUBufferMapAsyncStatus_Success) {
- switch (mRequest.type) {
+ switch (mPendingMapRequest->type) {
case MapRequestType::Read: {
if (readDataUpdateInfoLength > std::numeric_limits<size_t>::max()) {
// This is the size of data deserialized from the command stream, which must
@@ -244,7 +274,7 @@
// Update user map data with server returned data
if (!mReadHandle->DeserializeDataUpdate(
readDataUpdateInfo, static_cast<size_t>(readDataUpdateInfoLength),
- mRequest.offset, mRequest.size)) {
+ mPendingMapRequest->offset, mPendingMapRequest->size)) {
return FailRequest();
}
mMapState = MapState::MappedForRead;
@@ -263,8 +293,8 @@
DAWN_UNREACHABLE();
}
- mMapOffset = mRequest.offset;
- mMapSize = mRequest.size;
+ mMapOffset = mPendingMapRequest->offset;
+ mMapSize = mPendingMapRequest->size;
}
InvokeAndClearCallback(static_cast<WGPUBufferMapAsyncStatus>(status));
@@ -378,7 +408,7 @@
case MapState::MappedAtCreation:
return WGPUBufferMapState_Mapped;
case MapState::Unmapped:
- if (mPendingMap) {
+ if (mPendingMapRequest) {
return WGPUBufferMapState_Pending;
} else {
return WGPUBufferMapState_Unmapped;
diff --git a/src/dawn/wire/client/Buffer.h b/src/dawn/wire/client/Buffer.h
index d5fc573..194bbba 100644
--- a/src/dawn/wire/client/Buffer.h
+++ b/src/dawn/wire/client/Buffer.h
@@ -16,7 +16,9 @@
#define SRC_DAWN_WIRE_CLIENT_BUFFER_H_
#include <memory>
+#include <optional>
+#include "dawn/common/FutureUtils.h"
#include "dawn/webgpu.h"
#include "dawn/wire/WireClient.h"
#include "dawn/wire/client/ObjectBase.h"
@@ -32,7 +34,7 @@
Buffer(const ObjectBaseParams& params, const WGPUBufferDescriptor* descriptor);
~Buffer() override;
- bool OnMapAsyncCallback(uint64_t requestSerial,
+ bool OnMapAsyncCallback(WGPUFuture future,
uint32_t status,
uint64_t readDataUpdateInfoLength,
const uint8_t* readDataUpdateInfo);
@@ -41,6 +43,10 @@
size_t size,
WGPUBufferMapCallback callback,
void* userdata);
+ WGPUFuture MapAsyncF(WGPUMapModeFlags mode,
+ size_t offset,
+ size_t size,
+ const WGPUBufferMapCallbackInfo& callbackInfo);
void* GetMappedRange(size_t offset, size_t size);
const void* GetConstMappedRange(size_t offset, size_t size);
void Unmap();
@@ -54,7 +60,6 @@
WGPUBufferMapState GetMapState() const;
private:
- void CancelCallbacksForDisconnect() override;
void InvokeAndClearCallback(WGPUBufferMapAsyncStatus status);
bool IsMappedForReading() const;
@@ -72,18 +77,15 @@
MappedAtCreation,
};
- // Up to only one request can exist at a single time.
- // Other requests are rejected.
+ // Up to only one request can exist at a single time. Other requests are rejected.
struct MapRequestData {
- WGPUBufferMapCallback callback = nullptr;
- void* userdata = nullptr;
+ FutureID futureID = kNullFutureID;
size_t offset = 0;
size_t size = 0;
MapRequestType type = MapRequestType::None;
};
- MapRequestData mRequest;
- bool mPendingMap = false;
- uint64_t mSerial = 0;
+ std::optional<MapRequestData> mPendingMapRequest;
+
uint64_t mSize = 0;
WGPUBufferUsage mUsage;
diff --git a/src/dawn/wire/client/ClientDoers.cpp b/src/dawn/wire/client/ClientDoers.cpp
index ab449a8..9aa452d 100644
--- a/src/dawn/wire/client/ClientDoers.cpp
+++ b/src/dawn/wire/client/ClientDoers.cpp
@@ -76,7 +76,7 @@
}
bool Client::DoBufferMapAsyncCallback(Buffer* buffer,
- uint64_t requestSerial,
+ WGPUFuture future,
uint32_t status,
uint64_t readDataUpdateInfoLength,
const uint8_t* readDataUpdateInfo) {
@@ -84,18 +84,17 @@
if (buffer == nullptr) {
return true;
}
- return buffer->OnMapAsyncCallback(requestSerial, status, readDataUpdateInfoLength,
- readDataUpdateInfo);
+ return buffer->OnMapAsyncCallback(future, status, readDataUpdateInfoLength, readDataUpdateInfo);
}
bool Client::DoQueueWorkDoneCallback(Queue* queue,
- uint64_t requestSerial,
+ WGPUFuture future,
WGPUQueueWorkDoneStatus status) {
// The queue might have been deleted or recreated so this isn't an error.
if (queue == nullptr) {
return true;
}
- return queue->OnWorkDoneCallback(requestSerial, status);
+ return queue->OnWorkDoneCallback(future, status);
}
bool Client::DoDeviceCreateComputePipelineAsyncCallback(Device* device,
diff --git a/src/dawn/wire/client/EventManager.cpp b/src/dawn/wire/client/EventManager.cpp
index 1e0f1e0..4a28e7a 100644
--- a/src/dawn/wire/client/EventManager.cpp
+++ b/src/dawn/wire/client/EventManager.cpp
@@ -13,6 +13,7 @@
// limitations under the License.
#include <map>
+#include <optional>
#include <utility>
#include <vector>
@@ -26,18 +27,17 @@
EventManager::EventManager(Client* client) : mClient(client) {}
-std::pair<FutureID, bool> EventManager::TrackEvent(WGPUCallbackMode mode,
- EventCallback&& callback) {
+std::pair<FutureID, bool> EventManager::TrackEvent(TrackedEvent* event) {
FutureID futureID = mNextFutureID++;
+ std::unique_ptr<TrackedEvent> ptr(event);
if (mClient->IsDisconnected()) {
- callback(EventCompletionType::Shutdown);
+ std::move(ptr)->Complete(EventCompletionType::Shutdown);
return {futureID, false};
}
mTrackedEvents.Use([&](auto trackedEvents) {
- auto [it, inserted] =
- trackedEvents->emplace(futureID, TrackedEvent(mode, std::move(callback)));
+ auto [it, inserted] = trackedEvents->emplace(futureID, std::move(ptr));
DAWN_ASSERT(inserted);
});
@@ -47,7 +47,7 @@
void EventManager::ShutDown() {
// Call any outstanding callbacks before destruction.
while (true) {
- std::map<FutureID, TrackedEvent> movedEvents;
+ std::map<FutureID, std::unique_ptr<TrackedEvent>> movedEvents;
mTrackedEvents.Use([&](auto trackedEvents) { movedEvents = std::move(*trackedEvents); });
if (movedEvents.empty()) {
@@ -56,55 +56,61 @@
// Ordering guaranteed because we are using a sorted map.
for (auto& [futureID, trackedEvent] : movedEvents) {
- // Event should be already marked Ready since events are actually driven by
- // RequestTrackers (at the time of this writing), which all shut down before this.
- DAWN_ASSERT(trackedEvent.mReady);
- trackedEvent.mCallback(EventCompletionType::Shutdown);
- trackedEvent.mCallback = nullptr;
+ std::move(trackedEvent)->Complete(EventCompletionType::Shutdown);
}
}
}
-void EventManager::SetFutureReady(FutureID futureID) {
+void EventManager::SetFutureReady(FutureID futureID, std::function<void(TrackedEvent&)>&& ready) {
DAWN_ASSERT(futureID > 0);
+ // If the client was already disconnected, then all the callbacks should already have fired so
+ // we don't need to fire the callback anymore.
+ if (mClient->IsDisconnected()) {
+ return;
+ }
+
+ std::optional<std::unique_ptr<TrackedEvent>> event;
mTrackedEvents.Use([&](auto trackedEvents) {
- TrackedEvent& trackedEvent = trackedEvents->at(futureID); // Asserts futureID is in the map
- trackedEvent.mReady = true;
+ std::unique_ptr<TrackedEvent>& trackedEvent =
+ trackedEvents->at(futureID); // Asserts futureID is in the map
+ trackedEvent->mReady = true;
+ if (ready) {
+ ready(*trackedEvent);
+ }
+
+ // If the event can be spontaneously completed, do so now.
+ if (trackedEvent->mMode == WGPUCallbackMode_AllowSpontaneous) {
+ event = std::move(trackedEvent);
+ trackedEvents->erase(futureID);
+ }
});
- // TODO(crbug.com/dawn/2059): Handle spontaneous completions.
+
+ // Handle spontaneous completions.
+ if (event.has_value()) {
+ std::move(*event)->Complete(EventCompletionType::Ready);
+ }
}
void EventManager::ProcessPollEvents() {
// Since events are already stored in an ordered map, this list must already be ordered.
- std::vector<TrackedEvent> eventsToCompleteNow;
-
- // TODO(crbug.com/dawn/2060): EventManager shouldn't bother to track ProcessEvents-type events
- // until they've completed. We can queue them up when they're received on the wire. (Before that
- // point, the RequestTracker tracks them. If/when we merge this with RequestTracker, then we'll
- // track both here but still don't need to queue them for ProcessEvents until they complete.)
+ std::vector<std::unique_ptr<TrackedEvent>> eventsToCompleteNow;
mTrackedEvents.Use([&](auto trackedEvents) {
for (auto it = trackedEvents->begin(); it != trackedEvents->end();) {
- TrackedEvent& event = it->second;
- bool shouldRemove = (event.mMode == WGPUCallbackMode_AllowProcessEvents ||
- event.mMode == WGPUCallbackMode_AllowSpontaneous) &&
- event.mReady;
+ std::unique_ptr<TrackedEvent>& event = it->second;
+ bool shouldRemove = (event->mMode == WGPUCallbackMode_AllowProcessEvents ||
+ event->mMode == WGPUCallbackMode_AllowSpontaneous) &&
+ event->mReady;
if (!shouldRemove) {
++it;
continue;
}
-
- // mCallback may still be null if it's stale (was already spontaneously completed).
- if (event.mCallback) {
- eventsToCompleteNow.emplace_back(std::move(event));
- }
+ eventsToCompleteNow.emplace_back(std::move(event));
it = trackedEvents->erase(it);
}
});
- for (TrackedEvent& event : eventsToCompleteNow) {
- DAWN_ASSERT(event.mReady && event.mCallback);
- event.mCallback(EventCompletionType::Ready);
- event.mCallback = nullptr;
+ for (std::unique_ptr<TrackedEvent>& event : eventsToCompleteNow) {
+ std::move(event)->Complete(EventCompletionType::Ready);
}
}
@@ -124,7 +130,7 @@
// Since the user can specify the FutureIDs in any order, we need to use another ordered map
// here to ensure that the result is ordered for JS event ordering.
- std::map<FutureID, TrackedEvent> eventsToCompleteNow;
+ std::map<FutureID, std::unique_ptr<TrackedEvent>> eventsToCompleteNow;
bool anyCompleted = false;
const FutureID firstInvalidFutureID = mNextFutureID;
mTrackedEvents.Use([&](auto trackedEvents) {
@@ -139,38 +145,35 @@
continue;
}
- TrackedEvent& event = it->second;
+ std::unique_ptr<TrackedEvent>& event = it->second;
// Early update .completed, in prep to complete the callback if ready.
- infos[i].completed = event.mReady;
- if (event.mReady) {
+ infos[i].completed = event->mReady;
+ if (event->mReady) {
anyCompleted = true;
- if (event.mCallback) {
- eventsToCompleteNow.emplace(it->first, std::move(event));
- }
+ eventsToCompleteNow.emplace(it->first, std::move(event));
trackedEvents->erase(it);
}
}
});
- // TODO(crbug.com/dawn/2066): Guarantee the event ordering from the JS spec.
for (auto& [_, event] : eventsToCompleteNow) {
- DAWN_ASSERT(event.mReady && event.mCallback);
// .completed has already been set to true (before the callback, per API contract).
- event.mCallback(EventCompletionType::Ready);
- event.mCallback = nullptr;
+ std::move(event)->Complete(EventCompletionType::Ready);
}
return anyCompleted ? WGPUWaitStatus_Success : WGPUWaitStatus_TimedOut;
}
-// EventManager::TrackedEvent
+// TrackedEvent
-EventManager::TrackedEvent::TrackedEvent(WGPUCallbackMode mode, EventCallback&& callback)
- : mMode(mode), mCallback(callback) {}
+TrackedEvent::TrackedEvent(WGPUCallbackMode mode, void* userdata)
+ : mMode(mode), mUserdata(userdata) {}
-EventManager::TrackedEvent::~TrackedEvent() {
- // Make sure we're not dropping a callback on the floor.
- DAWN_ASSERT(!mCallback);
+TrackedEvent::~TrackedEvent() = default;
+
+void TrackedEvent::Complete(EventCompletionType type) {
+ DAWN_ASSERT(type == EventCompletionType::Shutdown || mReady);
+ CompleteImpl(type);
}
} // namespace dawn::wire::client
diff --git a/src/dawn/wire/client/EventManager.h b/src/dawn/wire/client/EventManager.h
index 2c599e2..0512250 100644
--- a/src/dawn/wire/client/EventManager.h
+++ b/src/dawn/wire/client/EventManager.h
@@ -18,6 +18,7 @@
#include <cstddef>
#include <functional>
#include <map>
+#include <memory>
#include <utility>
#include "dawn/common/FutureUtils.h"
@@ -31,8 +32,21 @@
class Client;
-// Code to run to complete the event (after receiving a ready notification from the wire).
-using EventCallback = std::function<void(EventCompletionType)>;
+struct TrackedEvent : NonMovable {
+ TrackedEvent(WGPUCallbackMode mode, void* userdata);
+ virtual ~TrackedEvent();
+
+ void Complete(EventCompletionType type);
+
+ WGPUCallbackMode mMode;
+ void* mUserdata = nullptr;
+ // These states don't need to be atomic because they're always protected by mTrackedEventsMutex
+ // (or moved out to a local variable).
+ bool mReady = false;
+
+ protected:
+ virtual void CompleteImpl(EventCompletionType type) = 0;
+};
// Subcomponent which tracks callback events for the Future-based callback
// entrypoints. All events from this instance (regardless of whether from an adapter, device, queue,
@@ -46,34 +60,19 @@
// Returns a pair of the FutureID and a bool that is true iff the event was successfuly tracked,
// false otherwise. Events may not be tracked if the client is already disconnected.
- std::pair<FutureID, bool> TrackEvent(WGPUCallbackMode mode, EventCallback&& callback);
+ std::pair<FutureID, bool> TrackEvent(TrackedEvent* event);
void ShutDown();
- void SetFutureReady(FutureID futureID);
+ void SetFutureReady(FutureID futureID, std::function<void(TrackedEvent&)>&& ready = {});
void ProcessPollEvents();
WGPUWaitStatus WaitAny(size_t count, WGPUFutureWaitInfo* infos, uint64_t timeoutNS);
private:
- struct TrackedEvent : dawn::NonCopyable {
- TrackedEvent(WGPUCallbackMode mode, EventCallback&& callback);
- ~TrackedEvent();
-
- TrackedEvent(TrackedEvent&&) = default;
- TrackedEvent& operator=(TrackedEvent&&) = default;
-
- WGPUCallbackMode mMode;
- // Callback. Falsey if already called.
- EventCallback mCallback;
- // These states don't need to be atomic because they're always protected by
- // mTrackedEventsMutex (or moved out to a local variable).
- bool mReady = false;
- };
-
Client* mClient;
// Tracks all kinds of events (for both WaitAny and ProcessEvents). We use an ordered map so
// that in most cases, event ordering is already implicit when we iterate the map. (Not true for
// WaitAny though because the user could specify the FutureIDs out of order.)
- MutexProtected<std::map<FutureID, TrackedEvent>> mTrackedEvents;
+ MutexProtected<std::map<FutureID, std::unique_ptr<TrackedEvent>>> mTrackedEvents;
std::atomic<FutureID> mNextFutureID = 1;
};
diff --git a/src/dawn/wire/client/Queue.cpp b/src/dawn/wire/client/Queue.cpp
index ad7c7e5..311fd72 100644
--- a/src/dawn/wire/client/Queue.cpp
+++ b/src/dawn/wire/client/Queue.cpp
@@ -18,38 +18,54 @@
#include "dawn/wire/client/EventManager.h"
namespace dawn::wire::client {
+namespace {
-Queue::~Queue() {
- ClearAllCallbacks(WGPUQueueWorkDoneStatus_Unknown);
-}
+struct WorkDoneEvent : public TrackedEvent {
+ explicit WorkDoneEvent(const WGPUQueueWorkDoneCallbackInfo& callbackInfo)
+ : TrackedEvent(callbackInfo.mode, callbackInfo.userdata),
+ mCallback(callbackInfo.callback) {}
-bool Queue::OnWorkDoneCallback(uint64_t requestSerial, WGPUQueueWorkDoneStatus status) {
- OnWorkDoneData request;
- if (!mOnWorkDoneRequests.Acquire(requestSerial, &request)) {
- return false;
+ void CompleteImpl(EventCompletionType completionType) override {
+ WGPUQueueWorkDoneStatus status = completionType == EventCompletionType::Shutdown
+ ? WGPUQueueWorkDoneStatus_DeviceLost
+ : WGPUQueueWorkDoneStatus_Success;
+ if (mStatus) {
+ // TODO(crbug.com/dawn/2021): Pretend things success when the device is lost.
+ status = *mStatus == WGPUQueueWorkDoneStatus_DeviceLost
+ ? WGPUQueueWorkDoneStatus_Success
+ : *mStatus;
+ }
+ if (mCallback) {
+ mCallback(status, mUserdata);
+ }
}
- request.callback(status, request.userdata);
+ static void WorkDoneEventReady(TrackedEvent& event, WGPUQueueWorkDoneStatus status) {
+ static_cast<WorkDoneEvent&>(event).mStatus = status;
+ }
+
+ WGPUQueueWorkDoneCallback mCallback;
+ std::optional<WGPUQueueWorkDoneStatus> mStatus;
+};
+
+} // anonymous namespace
+
+Queue::~Queue() = default;
+
+bool Queue::OnWorkDoneCallback(WGPUFuture future, WGPUQueueWorkDoneStatus status) {
+ GetClient()->GetEventManager()->SetFutureReady(
+ future.id, std::bind(WorkDoneEvent::WorkDoneEventReady, std::placeholders::_1, status));
return true;
}
void Queue::OnSubmittedWorkDone(uint64_t signalValue,
WGPUQueueWorkDoneCallback callback,
void* userdata) {
- Client* client = GetClient();
- if (client->IsDisconnected()) {
- callback(WGPUQueueWorkDoneStatus_DeviceLost, userdata);
- return;
- }
-
- uint64_t serial = mOnWorkDoneRequests.Add({callback, userdata});
-
- QueueOnSubmittedWorkDoneCmd cmd;
- cmd.queueId = GetWireId();
- cmd.signalValue = signalValue;
- cmd.requestSerial = serial;
-
- client->SerializeCommand(cmd);
+ WGPUQueueWorkDoneCallbackInfo callbackInfo = {};
+ callbackInfo.mode = WGPUCallbackMode_AllowSpontaneous;
+ callbackInfo.callback = callback;
+ callbackInfo.userdata = userdata;
+ OnSubmittedWorkDoneF(callbackInfo);
}
WGPUFuture Queue::OnSubmittedWorkDoneF(const WGPUQueueWorkDoneCallbackInfo& callbackInfo) {
@@ -58,34 +74,16 @@
DAWN_ASSERT(callbackInfo.nextInChain == nullptr);
Client* client = GetClient();
- auto [futureIDInternal, tracked] = client->GetEventManager()->TrackEvent(
- callbackInfo.mode, [=](EventCompletionType completionType) {
- WGPUQueueWorkDoneStatus status = completionType == EventCompletionType::Shutdown
- ? WGPUQueueWorkDoneStatus_Unknown
- : WGPUQueueWorkDoneStatus_Success;
- callbackInfo.callback(status, callbackInfo.userdata);
- });
+ auto [futureIDInternal, tracked] =
+ client->GetEventManager()->TrackEvent(new WorkDoneEvent(callbackInfo));
if (!tracked) {
return {futureIDInternal};
}
- struct Lambda {
- Client* client;
- FutureID futureIDInternal;
- };
- Lambda* lambda = new Lambda{client, futureIDInternal};
- uint64_t serial = mOnWorkDoneRequests.Add(
- {[](WGPUQueueWorkDoneStatus /* ignored */, void* userdata) {
- auto* lambda = static_cast<Lambda*>(userdata);
- lambda->client->GetEventManager()->SetFutureReady(lambda->futureIDInternal);
- delete lambda;
- },
- lambda});
-
QueueOnSubmittedWorkDoneCmd cmd;
cmd.queueId = GetWireId();
cmd.signalValue = 0;
- cmd.requestSerial = serial;
+ cmd.future = {futureIDInternal};
client->SerializeCommand(cmd);
return {futureIDInternal};
@@ -120,16 +118,4 @@
GetClient()->SerializeCommand(cmd);
}
-void Queue::CancelCallbacksForDisconnect() {
- ClearAllCallbacks(WGPUQueueWorkDoneStatus_DeviceLost);
-}
-
-void Queue::ClearAllCallbacks(WGPUQueueWorkDoneStatus status) {
- mOnWorkDoneRequests.CloseAll([status](OnWorkDoneData* request) {
- if (request->callback != nullptr) {
- request->callback(status, request->userdata);
- }
- });
-}
-
} // namespace dawn::wire::client
diff --git a/src/dawn/wire/client/Queue.h b/src/dawn/wire/client/Queue.h
index 17edf06..3a36ee3 100644
--- a/src/dawn/wire/client/Queue.h
+++ b/src/dawn/wire/client/Queue.h
@@ -28,7 +28,7 @@
using ObjectBase::ObjectBase;
~Queue() override;
- bool OnWorkDoneCallback(uint64_t requestSerial, WGPUQueueWorkDoneStatus status);
+ bool OnWorkDoneCallback(WGPUFuture future, WGPUQueueWorkDoneStatus status);
// Dawn API
void OnSubmittedWorkDone(uint64_t signalValue,
@@ -41,16 +41,6 @@
size_t dataSize,
const WGPUTextureDataLayout* dataLayout,
const WGPUExtent3D* writeSize);
-
- private:
- void CancelCallbacksForDisconnect() override;
- void ClearAllCallbacks(WGPUQueueWorkDoneStatus status);
-
- struct OnWorkDoneData {
- WGPUQueueWorkDoneCallback callback = nullptr;
- void* userdata = nullptr;
- };
- RequestTracker<OnWorkDoneData> mOnWorkDoneRequests;
};
} // namespace dawn::wire::client
diff --git a/src/dawn/wire/server/Server.h b/src/dawn/wire/server/Server.h
index 61058ba..4f737b4 100644
--- a/src/dawn/wire/server/Server.h
+++ b/src/dawn/wire/server/Server.h
@@ -92,7 +92,7 @@
ObjectHandle buffer;
WGPUBuffer bufferObj;
- uint64_t requestSerial;
+ WGPUFuture future;
uint64_t offset;
uint64_t size;
WGPUMapModeFlags mode;
@@ -116,7 +116,7 @@
using CallbackUserdata::CallbackUserdata;
ObjectHandle queue;
- uint64_t requestSerial;
+ WGPUFuture future;
};
struct CreatePipelineAsyncUserData : CallbackUserdata {
diff --git a/src/dawn/wire/server/ServerBuffer.cpp b/src/dawn/wire/server/ServerBuffer.cpp
index 0213779..d37067e 100644
--- a/src/dawn/wire/server/ServerBuffer.cpp
+++ b/src/dawn/wire/server/ServerBuffer.cpp
@@ -52,7 +52,7 @@
}
WireResult Server::DoBufferMapAsync(Known<WGPUBuffer> buffer,
- uint64_t requestSerial,
+ WGPUFuture future,
WGPUMapModeFlags mode,
uint64_t offset64,
uint64_t size64) {
@@ -61,7 +61,7 @@
std::unique_ptr<MapUserdata> userdata = MakeUserdata<MapUserdata>();
userdata->buffer = buffer.AsHandle();
userdata->bufferObj = buffer->handle;
- userdata->requestSerial = requestSerial;
+ userdata->future = future;
userdata->mode = mode;
// Make sure that the deserialized offset and size are no larger than
@@ -211,7 +211,7 @@
ReturnBufferMapAsyncCallbackCmd cmd;
cmd.buffer = data->buffer;
- cmd.requestSerial = data->requestSerial;
+ cmd.future = data->future;
cmd.status = status;
cmd.readDataUpdateInfoLength = 0;
cmd.readDataUpdateInfo = nullptr;
diff --git a/src/dawn/wire/server/ServerQueue.cpp b/src/dawn/wire/server/ServerQueue.cpp
index d18ffc7..1b16f92 100644
--- a/src/dawn/wire/server/ServerQueue.cpp
+++ b/src/dawn/wire/server/ServerQueue.cpp
@@ -22,7 +22,7 @@
void Server::OnQueueWorkDone(QueueWorkDoneUserdata* data, WGPUQueueWorkDoneStatus status) {
ReturnQueueWorkDoneCallbackCmd cmd;
cmd.queue = data->queue;
- cmd.requestSerial = data->requestSerial;
+ cmd.future = data->future;
cmd.status = status;
SerializeCommand(cmd);
@@ -30,10 +30,10 @@
WireResult Server::DoQueueOnSubmittedWorkDone(Known<WGPUQueue> queue,
uint64_t signalValue,
- uint64_t requestSerial) {
+ WGPUFuture future) {
auto userdata = MakeUserdata<QueueWorkDoneUserdata>();
userdata->queue = queue.AsHandle();
- userdata->requestSerial = requestSerial;
+ userdata->future = future;
mProcs.queueOnSubmittedWorkDone(queue->handle, signalValue,
ForwardToServer<&Server::OnQueueWorkDone>, userdata.release());