Shortcuts

Program Listing for File XPUStream.h

Return to documentation for file (c10/xpu/XPUStream.h)

#pragma once

#include <c10/core/Stream.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/xpu/XPUFunctions.h>

namespace c10::xpu {

/*
 * Note [Stream Management]
 *
 * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
 * can execute. Currently, there are several pools per device to manage SYCL
 * queue, and a device's pool is lazily created.
 *
 * There are two pools per device. The first pool contains "normal priority"
 * queues. The second pool is the "high priority" queues. There are 32 queues in
 * per pool per device, and when a queue is requested one of these queues is
 * returned round-robin. That is, the first queue requested is at index 0, the
 * second at index 1... to index 31, then index 0 again.
 *
 * This means that if 33 queues are requested, the first and last queues
 * requested are actually the same queue (under the covers) and kernels enqueued
 * on them cannot run concurrently.
 *
 * It is safe to enqueue a kernel on the same queue from two different
 * threads as the SYCL specification described.
 */

static constexpr int max_compile_time_stream_priorities = 2;

/*
 * This serves as a wrapper around c10::Stream and acts as a representation for
 * a SYCL queue, which allows asynchronous execution of XPU tasks.
 */
class C10_XPU_API XPUStream {
 public:
  enum Unchecked { UNCHECKED };

  explicit XPUStream(Stream stream) : stream_(stream) {
    TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
  }

  explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}

  bool operator==(const XPUStream& other) const noexcept {
    return unwrap() == other.unwrap();
  }

  bool operator!=(const XPUStream& other) const noexcept {
    return unwrap() != other.unwrap();
  }

  operator sycl::queue&() const {
    return queue();
  }

  operator Stream() const {
    return unwrap();
  }

  DeviceType device_type() const {
    return DeviceType::XPU;
  }

  DeviceIndex device_index() const {
    return stream_.device_index();
  }

  Device device() const {
    return Device(DeviceType::XPU, device_index());
  }

  StreamId id() const {
    return stream_.id();
  }

  bool query() const {
    return queue().ext_oneapi_empty();
  }

  void synchronize() const {
    queue().wait_and_throw();
    const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
    if (C10_UNLIKELY(interp)) {
      (*interp)->trace_gpu_stream_synchronization(
          c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));
    }
  }

  int priority() const;

  sycl::queue& queue() const;

  Stream unwrap() const {
    return stream_;
  }

  struct c10::StreamData3 pack3() const {
    return stream_.pack3();
  }

  static XPUStream unpack3(
      StreamId stream_id,
      DeviceIndex device_index,
      DeviceType device_type) {
    return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
  }

  static std::tuple<int, int> priority_range() {
    return std::make_tuple(0, -max_compile_time_stream_priorities + 1);
  }

 private:
  Stream stream_;
};

C10_XPU_API XPUStream
getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);

C10_XPU_API XPUStream
getStreamFromPool(const int priority, DeviceIndex device = -1);

C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);

C10_XPU_API void setCurrentXPUStream(XPUStream stream);

C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);

C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);

} // namespace c10::xpu

namespace std {
template <>
struct hash<c10::xpu::XPUStream> {
  size_t operator()(c10::xpu::XPUStream s) const noexcept {
    return std::hash<c10::Stream>{}(s.unwrap());
  }
};
} // namespace std

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources