[Feat]: Add RayStorageClient to support the RDT feature of Ray#102
[Feat]: Add RayStorageClient to support the RDT feature of Ray#102Evelynn-V wants to merge 4 commits into
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
| class RayStorageClient(TransferQueueStorageKVClient): | ||
|
|
||
| def __init__(self, config: dict[str, Any]): | ||
| if not ray.is_initialized(): |
There was a problem hiding this comment.
ray.init() is neccessary. However, this check should probably not be performed by the storage client; it might be the responsibility of the user or the upper-level interface.
There was a problem hiding this comment.
a tricky thing here: ray driver does not support ray.put(v, _tensor_transport="nixl") . so if ray is inited here, this process becomes a driver. The following put() will fail. So I prefer to raise an error here
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) | ||
| storage = ray.get_actor("RayGpuObjectRefStorage") | ||
|
|
||
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | ||
| # values = ray.get(gpu_obj_refs) | ||
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") |
There was a problem hiding this comment.
values is called between line:112 and line:115. ☀_create_empty_tensorlist() is redundant, we can remove it.
| if not ray.is_initialized(): | ||
| ray.init() |
| if self.use_gpu: | ||
| gpu_ids = ray.get_gpu_ids() | ||
| if gpu_ids: | ||
| self.device_id = gpu_ids[0] | ||
| else: | ||
| self.device_id = config.get("device_id", 0) | ||
| torch.cuda.set_device(self.device_id) |
There was a problem hiding this comment.
Users may call torch.cuda.set_devcie() in __main__. It might be better to first check whether the gpu_device has already been set in the environment. If not, then proceed to determine the self.device_id (and self.device_id should probably prioritize the value specified in the config).
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Pull Request Overview
This PR adds Ray-based distributed storage support to the transfer queue system, enabling tensor storage and retrieval using Ray's distributed object store with optional NIXL transport for GPU tensors.
- Implements
RayStorageClientfor storing and retrieving tensors via Ray's object store - Adds
RayKVStorageManagerto manage Ray-based key-value storage operations - Includes comprehensive tests for basic operations and multi-client concurrent scenarios
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| transfer_queue/storage/clients/ray_storage_client.py | Implements Ray storage client with GPU/CPU support and NIXL transport for distributed tensor operations |
| transfer_queue/storage/managers/ray_kv_manager.py | Adds Ray-based KV storage manager with device validation |
| transfer_queue/storage/clients/init.py | Exports the new RayStorageClient class |
| tests/test_ray_storage_client.py | Provides unit tests for Ray storage client operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Store tensors to remote storage. | ||
| Args: | ||
| keys (list): List of string keys | ||
| values (list): List of torch.Tensor on NPU |
There was a problem hiding this comment.
The docstring incorrectly states 'List of torch.Tensor on NPU' but this client supports both GPU (CUDA) and CPU tensors, not NPU. Update to reflect actual supported devices.
| values (list): List of torch.Tensor on NPU | |
| values (list): List of torch.Tensor on GPU (CUDA) or CPU |
| shapes (list): Expected shapes of returned tensors | ||
| dtypes (list): Expected dtypes of returned tensors | ||
| Returns: | ||
| list: List of retrieved NPU tensors |
There was a problem hiding this comment.
The docstring incorrectly states 'List of retrieved NPU tensors' but this returns either CUDA or CPU tensors, not NPU. Update to accurately describe the return type.
| Create a list of empty GPU tensors with given shapes and dtypes. | ||
| Args: | ||
| shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) | ||
| dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) | ||
| Returns: | ||
| list: List of uninitialized GPU tensors |
There was a problem hiding this comment.
The docstring says 'empty GPU tensors' but when use_gpu is False, this method creates CPU tensors. Update to 'Create a list of empty tensors with given shapes and dtypes' for accuracy.
| Create a list of empty GPU tensors with given shapes and dtypes. | |
| Args: | |
| shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) | |
| dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) | |
| Returns: | |
| list: List of uninitialized GPU tensors | |
| Create a list of empty tensors with given shapes and dtypes. | |
| The tensors are created on the device specified by self.device (CPU or GPU). | |
| Args: | |
| shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) | |
| dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) | |
| Returns: | |
| list: List of uninitialized tensors |
| shapes (list): List of tensor shapes (e.g., [(3,), (2, 4)]) | ||
| dtypes (list): List of torch dtypes (e.g., [torch.float32, torch.int64]) | ||
| Returns: | ||
| list: List of uninitialized GPU tensors |
There was a problem hiding this comment.
The return description states 'GPU tensors' but the method can return CPU tensors when use_gpu is False. Update to 'list: List of uninitialized tensors'.
| list: List of uninitialized GPU tensors | |
| list: List of uninitialized tensors |
| if len(dtypes) != len(shapes): | ||
| raise ValueError("Length of dtypes must equal length of shapes") | ||
|
|
||
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) |
There was a problem hiding this comment.
The variable values is assigned from _create_empty_tensorlist but then immediately overwritten on line 116 without being used. Remove this unused assignment.
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) |
|
|
||
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | ||
| # values = ray.get(gpu_obj_refs) | ||
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") |
There was a problem hiding this comment.
The code unconditionally uses _tensor_transport='nixl' even when self.use_gpu is False. This should be conditional like in the put method to avoid errors when running on CPU-only systems.
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | |
| if self.use_gpu: | |
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | |
| else: | |
| values = ray.get(gpu_obj_refs) |
| from transfer_queue.storage.managers.base import KVStorageManager | ||
|
|
||
|
|
||
| class RayKVStorageManager(KVStorageManager): |
There was a problem hiding this comment.
The RayKVStorageManager class is not registered with the TransferQueueStorageManagerFactory and is not exported in the __init__.py. This makes it inaccessible through the factory pattern used elsewhere in the codebase.
|
|
||
| for i, client in enumerate(clients): | ||
| keys = [f"client_{i}_tensor_{j}" for j in range(3)] | ||
| values = [torch.randn(10, 10) * i for _ in range(3)] |
There was a problem hiding this comment.
The variable i is used in the list comprehension but the loop variable is _. This means all three tensors have the same multiplier. Change _ to j or use i consistently.
| values = [torch.randn(10, 10) * i for _ in range(3)] | |
| values = [torch.randn(10, 10) * j for j in range(3)] |
| import torch | ||
| import sys | ||
| from pathlib import Path | ||
| from tensordict import TensorDict |
There was a problem hiding this comment.
Import of 'TensorDict' is not used.
| from tensordict import TensorDict |
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
tests/test_ray_storage_client.py(1 hunks)transfer_queue/storage/clients/__init__.py(1 hunks)transfer_queue/storage/clients/ray_storage_client.py(1 hunks)transfer_queue/storage/managers/ray_kv_manager.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Agent
| if len(dtypes) != len(shapes): | ||
| raise ValueError("Length of dtypes must equal length of shapes") | ||
|
|
||
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) | ||
| storage = ray.get_actor("RayGpuObjectRefStorage") | ||
|
|
||
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | ||
| # values = ray.get(gpu_obj_refs) | ||
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | ||
| return values |
There was a problem hiding this comment.
Handle optional shapes/dtypes before calling len()
get() declares shapes/dtypes as optional but immediately calls len() on them, so any caller using the defaults hits a TypeError. Enforce that both are provided (or supply a fallback) before taking their length.
Please apply:
- if len(dtypes) != len(shapes):
- raise ValueError("Length of dtypes must equal length of shapes")
-
- values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes)
+ if shapes is None or dtypes is None:
+ raise ValueError("Both shapes and dtypes are required when retrieving tensors")
+ if len(dtypes) != len(shapes):
+ raise ValueError("Length of dtypes must equal length of shapes")🤖 Prompt for AI Agents
In transfer_queue/storage/clients/ray_storage_client.py around lines 108-117,
the method treats shapes and dtypes as optional but calls len() on them
immediately, causing a TypeError when they are None; add a guard before the
length check to ensure both shapes and dtypes are provided (or explicitly set
sensible defaults) — e.g., if either is None raise a clear ValueError like
"shapes and dtypes must be provided" (or derive defaults) before using len(),
then proceed with creating tensors and fetching GPU refs as before.
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | ||
| # values = ray.get(gpu_obj_refs) | ||
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | ||
| return values |
There was a problem hiding this comment.
Match tensor transport with how the refs were stored
In the CPU path put() uses the default object store, but get() forces _tensor_transport="nixl". If NIXL isn’t installed or the refs came from the object store (the CPU case), ray.get raises. Pick the transport conditionally so CPU callers use the object store while GPU callers continue with NIXL.(docs.ray.io)
Suggested fix:
- gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys))
- # values = ray.get(gpu_obj_refs)
- values = ray.get(gpu_obj_refs, _tensor_transport="nixl")
+ gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys))
+ if self.use_gpu:
+ values = ray.get(gpu_obj_refs, _tensor_transport="nixl")
+ else:
+ values = ray.get(gpu_obj_refs)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | |
| # values = ray.get(gpu_obj_refs) | |
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | |
| return values | |
| gpu_obj_refs = ray.get(storage.get_gpu_obj_ref.remote(keys)) | |
| if self.use_gpu: | |
| values = ray.get(gpu_obj_refs, _tensor_transport="nixl") | |
| else: | |
| values = ray.get(gpu_obj_refs) | |
| return values |
🤖 Prompt for AI Agents
In transfer_queue/storage/clients/ray_storage_client.py around lines 114-117,
the code unconditionally calls ray.get(..., _tensor_transport="nixl") which will
fail for refs stored in the default CPU object store or when NIXL isn't
available; change this to try the NIXL transport first and if ray.get raises
(TypeError/RuntimeError or any RayError indicating unsupported transport), fall
back to calling ray.get without the _tensor_transport argument so CPU-backed
refs succeed — implement a try/except around the ray.get call and return the
successful result from either the NIXL attempt or the default fallback.
| from transfer_queue.storage.clients.base import TransferQueueStorageKVClient | ||
| from transfer_queue.storage.clients.factory import StorageClientFactory | ||
|
|
||
| @ray.remote |
There was a problem hiding this comment.
Maybe we should set @ray.remote(max_concurrency=XX) to enable concurrent remote call for better performance
| from transfer_queue.storage.clients.factory import StorageClientFactory | ||
|
|
||
| @ray.remote | ||
| class RayGpuObjectRefStorage: |
There was a problem hiding this comment.
I think this class can also be used when RDT is not available. So maybe it should be called as RayObjectRefStorage?
| Store tensors to remote storage. | ||
| Args: | ||
| keys (list): List of string keys | ||
| values (list): List of torch.Tensor on NPU |
There was a problem hiding this comment.
users may also use ray to transport torch.Tensor on CPU
| values (list): List of torch.Tensor on NPU | ||
| """ | ||
| if not isinstance(keys, list) or not isinstance(values, list): | ||
| raise ValueError("keys and values must be lists") |
There was a problem hiding this comment.
| raise ValueError("keys and values must be lists") | |
| raise ValueError(f"keys and values must be lists, but got {type(keys)} and {type(values)}") |
| if not isinstance(value, torch.Tensor): | ||
| raise ValueError(f"Expected torch.Tensor, got {type(value)}") | ||
|
|
||
| # TODO: NIXL can only be initialized in an environment with GPU, even if data is transferred on the cpu. |
There was a problem hiding this comment.
If already fixed, simply delete the TODO~
| obj_refs = [ray.put(v) for v in values] | ||
| # obj_refs = [ray.put(v, _tensor_transport="nixl") for v in values] | ||
|
|
||
| storage = RayGpuObjectRefStorage.options( |
There was a problem hiding this comment.
Maybe we can put this during __init__ and use self.storage to prevent frequent interaction with raylet
| raise ValueError("Length of dtypes must equal length of shapes") | ||
|
|
||
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) | ||
| storage = ray.get_actor("RayGpuObjectRefStorage") |
| Returns: | ||
| list: List of retrieved NPU tensors | ||
| """ | ||
| if len(dtypes) != len(shapes): |
There was a problem hiding this comment.
shape and dtypes can be None, which will cause TypeError when calling len(dtypes)
| if len(dtypes) != len(shapes): | ||
| raise ValueError("Length of dtypes must equal length of shapes") | ||
|
|
||
| values: list[Tensor] = self._create_empty_tensorlist(shapes=shapes, dtypes=dtypes) |
There was a problem hiding this comment.
We need to support fallback logic when shapes and dtypes are not available when we try to use ray to store non tensor objects such as np.array, str, etc.
There was a problem hiding this comment.
check if it is tensor and can be used in rdt automatically, when not available, use ordinary ray obj store
| Args: | ||
| keys (list): List of keys to delete | ||
| """ | ||
| storage = ray.get_actor("RayGpuObjectRefStorage") |
|
|
||
| class RayKVStorageManager(KVStorageManager): | ||
| def __init__(self, config: dict[str, Any]): | ||
| device_id = config.get("device_id", None) |
There was a problem hiding this comment.
It has been deleted because we don't care about device_id.
|
|
||
| self.use_gpu = torch.cuda.is_available() | ||
|
|
||
| if self.use_gpu: |
There was a problem hiding this comment.
Does the NPU support RDT? In other words, do we need to add a monkey patch to the transfer queue to support this feature?
There was a problem hiding this comment.
Probably not feasible. RDT is based on NIXL for transmission, while NIXL only supports GPU and CPU.
Summary
Implemented the class RayStorageClient and tested on the cpu using object_store (requires using NIXL on GPUs further testing)
Change
Added
transfer_queue/storage/managers/ray_kv_manager.py: Added class RayKVStorageManager, inherited from class KVStroageManager, has been added to implement the verification of device_id configuration and the initialization of ray.Added
transfer_queue/storage/clients/ray_storage_clients.py: Added classes RayStorageClient and RayGpuObjectRefStorage, encapsulating the invocation of the NIXL transport interface used in Ray.Add simple unit tests:
tests/test_ray_storage_client.py.Testing
Test RayStorageClient on CPU (Requires installing ray >= 2.50):
pytest tests/test_ray_storage_client.py::test_ray_storage_put_get-- Test RayStorageClient on GPU:
pytest tests/test_ray_storage_client.py::test_nixl_vs_object_store_performance -sRelated Links
Issue: [Feat]: Add general key-value storage interface (KVStorageManager) & yuanrong storage backend
Ray-RDT: Ray Direct Transport Usage with NIXL (CPUs or NVIDIA GPUs)
TODO
Summary by CodeRabbit
New Features
Tests