
Program Listing for File XPUStream.h

Return to documentation for file (c10/xpu/XPUStream.h)

#pragma once

#include <c10/core/Stream.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/xpu/XPUFunctions.h>

namespace c10::xpu {

 * Note [Stream Management]
 * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
 * can execute. Currently, there are several pools per device to manage SYCL
 * queue, and a device's pool is lazily created.
 * There are two pools per device. The first pool contains "normal priority"
 * queues. The second pool is the "high priority" queues. There are 32 queues in
 * per pool per device, and when a queue is requested one of these queues is
 * returned round-robin. That is, the first queue requested is at index 0, the
 * second at index 1... to index 31, then index 0 again.
 * This means that if 33 queues are requested, the first and last queues
 * requested are actually the same queue (under the covers) and kernels enqueued
 * on them cannot run concurrently.
 * It is safe to enqueue a kernel on the same queue from two different
 * threads as the SYCL specification described.

static constexpr int max_compile_time_stream_priorities = 3;

 * This serves as a wrapper around c10::Stream and acts as a representation for
 * a SYCL queue, which allows asynchronous execution of XPU tasks.
class C10_XPU_API XPUStream {
  enum Unchecked { UNCHECKED };

  explicit XPUStream(Stream stream) : stream_(stream) {
    TORCH_CHECK(stream_.device_type() == DeviceType::XPU);

  explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}

  bool operator==(const XPUStream& other) const noexcept {
    return unwrap() == other.unwrap();

  bool operator!=(const XPUStream& other) const noexcept {
    return unwrap() != other.unwrap();

  operator sycl::queue&() const {
    return queue();

  operator Stream() const {
    return unwrap();

  DeviceType device_type() const {
    return DeviceType::XPU;

  DeviceIndex device_index() const {
    return stream_.device_index();

  Device device() const {
    return Device(DeviceType::XPU, device_index());

  StreamId id() const {

  bool query() const {
    return queue().ext_oneapi_empty();

  void synchronize() const {
    const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
    if (C10_UNLIKELY(interp)) {
          c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));

  int priority() const;

  sycl::queue& queue() const;

  Stream unwrap() const {
    return stream_;

  struct c10::StreamData3 pack3() const {
    return stream_.pack3();

  static XPUStream unpack3(
      StreamId stream_id,
      DeviceIndex device_index,
      DeviceType device_type) {
    return XPUStream(Stream::unpack3(stream_id, device_index, device_type));

  static std::tuple<int, int> priority_range() {
    // See Note [XPU Stream priorities]
    return std::make_tuple(1, -max_compile_time_stream_priorities + 2);

  Stream stream_;

getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);

getStreamFromPool(const int priority, DeviceIndex device = -1);

getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index);

C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);

C10_XPU_API void setCurrentXPUStream(XPUStream stream);

C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);

C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);

} // namespace c10::xpu

namespace std {
template <>
struct hash<c10::xpu::XPUStream> {
  size_t operator()(c10::xpu::XPUStream s) const noexcept {
    return std::hash<c10::Stream>{}(s.unwrap());
} // namespace std


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources