Program Listing for File XPUStream.h¶
↰ Return to documentation for file (c10/xpu/XPUStream.h
)
#pragma once
#include <c10/core/Stream.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/xpu/XPUFunctions.h>
namespace c10::xpu {
/*
* Note [Stream Management]
*
* An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
* can execute. Currently, there are several pools per device to manage SYCL
* queue, and a device's pool is lazily created.
*
* There are two pools per device. The first pool contains "normal priority"
* queues. The second pool is the "high priority" queues. There are 32 queues in
* per pool per device, and when a queue is requested one of these queues is
* returned round-robin. That is, the first queue requested is at index 0, the
* second at index 1... to index 31, then index 0 again.
*
* This means that if 33 queues are requested, the first and last queues
* requested are actually the same queue (under the covers) and kernels enqueued
* on them cannot run concurrently.
*
* It is safe to enqueue a kernel on the same queue from two different
* threads as the SYCL specification described.
*/
static constexpr int max_compile_time_stream_priorities = 2;
/*
* This serves as a wrapper around c10::Stream and acts as a representation for
* a SYCL queue, which allows asynchronous execution of XPU tasks.
*/
class C10_XPU_API XPUStream {
public:
enum Unchecked { UNCHECKED };
explicit XPUStream(Stream stream) : stream_(stream) {
TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
}
explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}
bool operator==(const XPUStream& other) const noexcept {
return unwrap() == other.unwrap();
}
bool operator!=(const XPUStream& other) const noexcept {
return unwrap() != other.unwrap();
}
operator sycl::queue&() const {
return queue();
}
operator Stream() const {
return unwrap();
}
DeviceType device_type() const {
return DeviceType::XPU;
}
DeviceIndex device_index() const {
return stream_.device_index();
}
Device device() const {
return Device(DeviceType::XPU, device_index());
}
StreamId id() const {
return stream_.id();
}
bool query() const {
return queue().ext_oneapi_empty();
}
void synchronize() const {
queue().wait_and_throw();
const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
if (C10_UNLIKELY(interp)) {
(*interp)->trace_gpu_stream_synchronization(
c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));
}
}
int priority() const;
sycl::queue& queue() const;
Stream unwrap() const {
return stream_;
}
struct c10::StreamData3 pack3() const {
return stream_.pack3();
}
static XPUStream unpack3(
StreamId stream_id,
DeviceIndex device_index,
DeviceType device_type) {
return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
}
static std::tuple<int, int> priority_range() {
return std::make_tuple(0, -max_compile_time_stream_priorities + 1);
}
private:
Stream stream_;
};
C10_XPU_API XPUStream
getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);
C10_XPU_API XPUStream
getStreamFromPool(const int priority, DeviceIndex device = -1);
C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);
C10_XPU_API void setCurrentXPUStream(XPUStream stream);
C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);
C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);
} // namespace c10::xpu
namespace std {
template <>
struct hash<c10::xpu::XPUStream> {
size_t operator()(c10::xpu::XPUStream s) const noexcept {
return std::hash<c10::Stream>{}(s.unwrap());
}
};
} // namespace std