Skip to content

Commit

Permalink
Stream._MembersNeededForFinalize() approach.
Browse files Browse the repository at this point in the history
  • Loading branch information
rwgk committed Nov 27, 2024
1 parent 74d2859 commit 9843fa5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cuda_core/cuda/core/experimental/_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def launch(kernel, config, *kernel_args):
drv_cfg = cuda.CUlaunchConfig()
drv_cfg.gridDimX, drv_cfg.gridDimY, drv_cfg.gridDimZ = config.grid
drv_cfg.blockDimX, drv_cfg.blockDimY, drv_cfg.blockDimZ = config.block
drv_cfg.hStream = config.stream._handle
drv_cfg.hStream = config.stream.handle
drv_cfg.sharedMemBytes = config.shmem_size
drv_cfg.numAttrs = 0 # TODO
handle_return(cuda.cuLaunchKernelEx(drv_cfg, int(kernel._handle), args_ptr, 0))
Expand Down
8 changes: 4 additions & 4 deletions cuda_core/cuda/core/experimental/_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def copy_to(self, dst: Buffer = None, *, stream) -> Buffer:
dst = self._mr.allocate(self._size, stream)
if dst._size != self._size:
raise ValueError("buffer sizes mismatch between src and dst")
handle_return(cuda.cuMemcpyAsync(dst._ptr, self._ptr, self._size, stream._handle))
handle_return(cuda.cuMemcpyAsync(dst._ptr, self._ptr, self._size, stream.handle))
return dst

def copy_from(self, src: Buffer, *, stream):
Expand All @@ -151,7 +151,7 @@ def copy_from(self, src: Buffer, *, stream):
raise ValueError("stream must be provided")
if src._size != self._size:
raise ValueError("buffer sizes mismatch between src and dst")
handle_return(cuda.cuMemcpyAsync(self._ptr, src._ptr, self._size, stream._handle))
handle_return(cuda.cuMemcpyAsync(self._ptr, src._ptr, self._size, stream.handle))

def __dlpack__(
self,
Expand Down Expand Up @@ -240,13 +240,13 @@ def __init__(self, dev_id):
def allocate(self, size, stream=None) -> Buffer:
if stream is None:
stream = default_stream()
ptr = handle_return(cuda.cuMemAllocFromPoolAsync(size, self._handle, stream._handle))
ptr = handle_return(cuda.cuMemAllocFromPoolAsync(size, self._handle, stream.handle))
return Buffer(ptr, size, self)

def deallocate(self, ptr, size, stream=None):
if stream is None:
stream = default_stream()
handle_return(cuda.cuMemFreeAsync(ptr, stream._handle))
handle_return(cuda.cuMemFreeAsync(ptr, stream.handle))

@property
def is_device_accessible(self) -> bool:
Expand Down
79 changes: 35 additions & 44 deletions cuda_core/cuda/core/experimental/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,24 @@ class Stream:
"""

__slots__ = (
"__weakref__",
"_handle",
"_nonblocking",
"_priority",
"_owner",
"_builtin",
"_device_id",
"_ctx_handle",
)
class _MembersNeededForFinalize:
__slots__ = ("handle", "owner", "builtin")

def __init__(self, stream_obj, handle, owner, builtin):
self.handle = handle
self.owner = owner
self.builtin = builtin
weakref.finalize(stream_obj, self.close)

def close(self):
if self.owner is None:
if self.handle and not self.builtin:
handle_return(cuda.cuStreamDestroy(self.handle))
else:
self.owner = None
self.handle = None

__slots__ = ("__weakref__", "_mnff", "_nonblocking", "_priority", "_device_id", "_ctx_handle")

def __init__(self):
raise NotImplementedError(
Expand All @@ -71,16 +79,10 @@ def __init__(self):
"available from somewhere else, Stream.from_handle()"
)

def _enable_finalize(self):
self._handle = None
self._owner = None
self._builtin = False
weakref.finalize(self, self.close)

@staticmethod
def _init(obj=None, *, options: Optional[StreamOptions] = None):
self = Stream.__new__(Stream)
self._enable_finalize()
self._mnff = Stream._MembersNeededForFinalize(self, None, None, False)

if obj is not None and options is not None:
raise ValueError("obj and options cannot be both specified")
Expand All @@ -89,9 +91,9 @@ def _init(obj=None, *, options: Optional[StreamOptions] = None):
raise ValueError
info = obj.__cuda_stream__
assert info[0] == 0
self._handle = cuda.CUstream(info[1])
self._mnff.handle = cuda.CUstream(info[1])
# TODO: check if obj is created under the current context/device
self._owner = obj
self._mnff.owner = obj
self._nonblocking = None # delayed
self._priority = None # delayed
self._device_id = None # delayed
Expand All @@ -111,8 +113,8 @@ def _init(obj=None, *, options: Optional[StreamOptions] = None):
else:
priority = high

self._handle = handle_return(cuda.cuStreamCreateWithPriority(flags, priority))
self._owner = None
self._mnff.handle = handle_return(cuda.cuStreamCreateWithPriority(flags, priority))
self._mnff.owner = None
self._nonblocking = nonblocking
self._priority = priority
# don't defer this because we will have to pay a cost for context
Expand All @@ -128,28 +130,23 @@ def close(self):
object will instead have their references released.
"""
if self._owner is None:
if self._handle and not self._builtin:
handle_return(cuda.cuStreamDestroy(self._handle))
else:
self._owner = None
self._handle = None
self._mnff.close()

@property
def __cuda_stream__(self) -> Tuple[int, int]:
"""Return an instance of a __cuda_stream__ protocol."""
return (0, int(self._handle))
return (0, self.handle)

@property
def handle(self) -> int:
"""Return the underlying cudaStream_t pointer address as Python int."""
return int(self._handle)
return int(self._mnff.handle)

@property
def is_nonblocking(self) -> bool:
"""Return True if this is a nonblocking stream, otherwise False."""
if self._nonblocking is None:
flag = handle_return(cuda.cuStreamGetFlags(self._handle))
flag = handle_return(cuda.cuStreamGetFlags(self._mnff.handle))
if flag == cuda.CUstream_flags.CU_STREAM_NON_BLOCKING:
self._nonblocking = True
else:
Expand All @@ -160,13 +157,13 @@ def is_nonblocking(self) -> bool:
def priority(self) -> int:
"""Return the stream priority."""
if self._priority is None:
prio = handle_return(cuda.cuStreamGetPriority(self._handle))
prio = handle_return(cuda.cuStreamGetPriority(self._mnff.handle))
self._priority = prio
return self._priority

def sync(self):
"""Synchronize the stream."""
handle_return(cuda.cuStreamSynchronize(self._handle))
handle_return(cuda.cuStreamSynchronize(self._mnff.handle))

def record(self, event: Event = None, options: EventOptions = None) -> Event:
"""Record an event onto the stream.
Expand Down Expand Up @@ -194,7 +191,7 @@ def record(self, event: Event = None, options: EventOptions = None) -> Event:
event = Event._init(options)
elif not isinstance(event, Event):
raise TypeError("record only takes an Event object")
handle_return(cuda.cuEventRecord(event.handle, self._handle))
handle_return(cuda.cuEventRecord(event.handle, self._mnff.handle))
return event

def wait(self, event_or_stream: Union[Event, Stream]):
Expand Down Expand Up @@ -223,7 +220,7 @@ def wait(self, event_or_stream: Union[Event, Stream]):
discard_event = True

# TODO: support flags other than 0?
handle_return(cuda.cuStreamWaitEvent(self._handle, event, 0))
handle_return(cuda.cuStreamWaitEvent(self._mnff.handle, event, 0))
if discard_event:
handle_return(cuda.cuEventDestroy(event))

Expand All @@ -243,15 +240,15 @@ def device(self) -> Device:
if self._device_id is None:
# Get the stream context first
if self._ctx_handle is None:
self._ctx_handle = handle_return(cuda.cuStreamGetCtx(self._handle))
self._ctx_handle = handle_return(cuda.cuStreamGetCtx(self._mnff.handle))
self._device_id = get_device_from_ctx(self._ctx_handle)
return Device(self._device_id)

@property
def context(self) -> Context:
"""Return the :obj:`Context` associated with this stream."""
if self._ctx_handle is None:
self._ctx_handle = handle_return(cuda.cuStreamGetCtx(self._handle))
self._ctx_handle = handle_return(cuda.cuStreamGetCtx(self._mnff.handle))
if self._device_id is None:
self._device_id = get_device_from_ctx(self._ctx_handle)
return Context._from_ctx(self._ctx_handle, self._device_id)
Expand Down Expand Up @@ -291,22 +288,16 @@ def __cuda_stream__(self):

class _LegacyDefaultStream(Stream):
def __init__(self):
self._enable_finalize()
self._handle = cuda.CUstream(cuda.CU_STREAM_LEGACY)
self._owner = None
self._mnff = Stream._MembersNeededForFinalize(self, cuda.CUstream(cuda.CU_STREAM_LEGACY), None, True)
self._nonblocking = None # delayed
self._priority = None # delayed
self._builtin = True


class _PerThreadDefaultStream(Stream):
def __init__(self):
self._enable_finalize()
self._handle = cuda.CUstream(cuda.CU_STREAM_PER_THREAD)
self._owner = None
self._mnff = Stream._MembersNeededForFinalize(self, cuda.CUstream(cuda.CU_STREAM_PER_THREAD), None, True)
self._nonblocking = None # delayed
self._priority = None # delayed
self._builtin = True


LEGACY_DEFAULT_STREAM = _LegacyDefaultStream()
Expand Down

0 comments on commit 9843fa5

Please sign in to comment.