-
Notifications
You must be signed in to change notification settings - Fork 3
[feat] Support store custom_meta in controller for backend-specific info #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2. allow kv storage manager put returns their custom metadata per index per field Signed-off-by: tianyi-ge <tianyig@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds custom metadata support to the storage layer, allowing different storage backends (Ray, Yuanrong, etc.) to store and retrieve backend-specific metadata alongside tensor data. The implementation adds a custom_meta parameter throughout the storage pipeline, from storage clients through storage managers to the controller.
Changes:
- Extended the abstract storage client API to return optional custom metadata from
putoperations and accept it ingetoperations - Added custom metadata tracking to
BatchMetaandDataPartitionStatusclasses - Updated storage managers to collect custom metadata from storage clients and propagate it to the controller
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/storage/clients/base.py | Updated abstract base class with Optional return type for put and custom_meta parameter for get |
| transfer_queue/storage/clients/mooncake_client.py | Implemented new put signature returning None and added custom_meta parameter to get |
| transfer_queue/storage/clients/yuanrong_client.py | Added custom_meta parameter to get method signature |
| transfer_queue/metadata.py | Added _custom_meta storage and methods to get/update custom metadata in BatchMeta |
| transfer_queue/storage/managers/base.py | Updated put_data to collect custom metadata from storage clients and notify_data_update to pass it to controller |
| transfer_queue/controller.py | Added field_custom_metas tracking to DataPartitionStatus and updated production status handling |
Comments suppressed due to low confidence (3)
transfer_queue/controller.py:845
- The docstring is missing documentation for the
custom_metaparameter that was added to the method signature. The docstring should include a description of this parameter similar to the documentation fordtypesandshapes.
"""
Update production status for specific samples and fields in a partition.
Delegates to the partition's own update_production_status method.
Args:
partition_id: ID of the partition
global_indexes: List of sample indices to update
field_names: List of field names to mark as produced
dtypes: Optional per-sample field dtype information
shapes: Optional per-sample field shape information
Returns:
True if update was successful, False otherwise
"""
transfer_queue/storage/clients/yuanrong_client.py:282
- The
getmethod now accepts acustom_metaparameter to match the abstract base class signature, but this parameter is not documented in the docstring and is not used in the method implementation. Consider adding documentation for this parameter explaining that it's unused for YuanrongStorageClient but required for API compatibility.
def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]:
"""Retrieves multiple values from remote storage with expected metadata.
Requires shape and dtype hints to reconstruct NPU tensors correctly.
Args:
keys (List[str]): Keys to fetch.
shapes (List[List[int]]): Expected tensor shapes (use [] for scalars).
dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data.
Returns:
List[Any]: Retrieved values in the same order as input keys.
"""
if shapes is None or dtypes is None:
raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes")
if not (len(keys) == len(shapes) == len(dtypes)):
raise ValueError("Lengths of keys, shapes, dtypes must match")
return self._batch_get(keys, shapes, dtypes)
transfer_queue/storage/clients/mooncake_client.py:113
- The
getmethod now accepts acustom_metaparameter to match the abstract base class signature, but this parameter is not used in the method implementation. Consider adding documentation for this parameter explaining that it's unused for MooncakeStorageClient but required for API compatibility.
def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]:
if shapes is None or dtypes is None:
raise ValueError("MooncakeStorageClient needs shapes and dtypes")
if not (len(keys) == len(shapes) == len(dtypes)):
raise ValueError("Lengths of keys, shapes, dtypes must match")
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
transfer_queue/metadata.py
Outdated
| """Get required custom meta as a list""" | ||
| return [ | ||
| self._custom_meta.get(index, {}).get(field_name, None) | ||
| for field_name, index in itertools.product(sorted(self.field_names), range(self.size)) |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_custom_meta_list method uses range(self.size) as indices to access _custom_meta, but _custom_meta is keyed by global_index (not batch index 0, 1, 2, ...). This will cause incorrect lookups and return None for all values. The method should iterate over self.global_indexes instead of range(self.size) to correctly retrieve the custom metadata. The correct iteration should be for field_name, index in itertools.product(sorted(self.field_names), self.global_indexes).
| for field_name, index in itertools.product(sorted(self.field_names), range(self.size)) | |
| for field_name, index in itertools.product(sorted(self.field_names), self.global_indexes) |
transfer_queue/controller.py
Outdated
| self.field_dtypes.difference_update(indexes_to_release) | ||
| self.field_shapes.difference_update(indexes_to_release) | ||
| self.field_custom_metas.difference_update(indexes_to_release) |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field_dtypes, field_shapes, and field_custom_metas are dictionaries (dict[int, dict[str, Any]]), but difference_update is a set method that doesn't exist for dictionaries. This will cause an AttributeError at runtime. These should be removed using dictionary deletion operations instead, such as iterating over indexes_to_release and deleting each key if it exists.
| self.field_dtypes.difference_update(indexes_to_release) | |
| self.field_shapes.difference_update(indexes_to_release) | |
| self.field_custom_metas.difference_update(indexes_to_release) | |
| for idx in indexes_to_release: | |
| self.field_dtypes.pop(idx, None) | |
| self.field_shapes.pop(idx, None) | |
| self.field_custom_metas.pop(idx, None) |
| def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Tensor]: | ||
| raise NotImplementedError("Subclasses must implement get") |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstract get method signature has been updated to accept a custom_meta parameter, but RayStorageClient.get has not been updated to match this new signature. This creates an inconsistency where the subclass doesn't match the abstract base class signature. The get method should accept the custom_meta parameter even if it's unused.
|
|
||
| @abstractmethod | ||
| def get(self, keys: list[str], shapes=None, dtypes=None) -> list[Tensor]: | ||
| def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Tensor]: |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get method lacks documentation for its parameters and return value, unlike the put method which has comprehensive documentation. Consider adding a docstring that documents the keys, shapes, dtypes, and custom_meta parameters, as well as the return value.
| def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Tensor]: | |
| def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Tensor]: | |
| """ | |
| Retrieve values from the storage backend by key. | |
| Args: | |
| keys (list[str]): List of keys whose values should be retrieved. | |
| shapes: Optional shape information for the expected tensors. The | |
| structure and interpretation of this argument are determined | |
| by the concrete storage backend implementation. | |
| dtypes: Optional data type information for the expected tensors. | |
| The structure and interpretation of this argument are | |
| determined by the concrete storage backend implementation. | |
| custom_meta: Optional backend-specific metadata used to control | |
| or optimize the retrieval process. Its format is defined by | |
| the concrete storage backend implementation. | |
| Returns: | |
| list[Tensor]: List of tensors retrieved from the storage backend, | |
| in the same order as the provided keys. | |
| """ |
| def _update_field_metadata( | ||
| self, | ||
| global_indices: list[int], | ||
| dtypes: Optional[dict[int, dict[str, Any]]], | ||
| shapes: Optional[dict[int, dict[str, Any]]], | ||
| dtypes: dict[int, dict[str, Any]], | ||
| shapes: dict[int, dict[str, Any]], | ||
| custom_meta: Optional[dict[int, dict[str, Any]]], | ||
| ): |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _update_field_metadata method signature declares dtypes and shapes as non-Optional (lines 386-387), but the caller update_production_status passes them as Optional (lines 328-329). This type inconsistency could lead to runtime errors if None is passed. Either make the parameters Optional in _update_field_metadata or ensure they are never None when calling it.
| for (global_idx, field_name), meta_value in zip( | ||
| itertools.product(metadata.global_indexes, metadata.field_names), |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of iteration in the itertools.product call doesn't match the order used in _generate_keys. Here it uses itertools.product(metadata.global_indexes, metadata.field_names) which iterates through all fields for each global_index, but _generate_keys uses itertools.product(sorted(field_names), global_indexes) which iterates through all global_indexes for each sorted field. This mismatch will cause custom_meta values to be associated with the wrong keys. The order should be itertools.product(sorted(metadata.field_names), metadata.global_indexes) to match the key generation order.
| for (global_idx, field_name), meta_value in zip( | |
| itertools.product(metadata.global_indexes, metadata.field_names), | |
| for (field_name, global_idx), meta_value in zip( | |
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), |
transfer_queue/metadata.py
Outdated
|
|
||
| samples: list[SampleMeta] | ||
| extra_info: dict[str, Any] = dataclasses.field(default_factory=dict) | ||
| # internal data for different storage backends: _custom_meta[index][field] |
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says _custom_meta[index][field] but doesn't clarify that "index" refers to the global_index, not the batch index. This ambiguity could lead to confusion. Consider updating the comment to explicitly state _custom_meta[global_index][field_name] for clarity.
| # internal data for different storage backends: _custom_meta[index][field] | |
| # internal data for different storage backends: _custom_meta[global_index][field_name] |
transfer_queue/metadata.py
Outdated
| def get_custom_meta_list(self) -> list[Any]: | ||
| """Get required custom meta as a list""" | ||
| return [ | ||
| self._custom_meta.get(index, {}).get(field_name, None) | ||
| for field_name, index in itertools.product(sorted(self.field_names), range(self.size)) | ||
| ] | ||
|
|
||
| def get_all_custom_meta(self) -> dict[int, dict[str, Any]]: | ||
| """Get the entire custom meta dictionary""" | ||
| return copy.deepcopy(self._custom_meta) | ||
|
|
||
| def update_custom_meta(self, new_custom_meta: dict[int, dict[str, Any]] = None): | ||
| """Update custom meta with a new dictionary""" | ||
| if new_custom_meta: | ||
| self._custom_meta.update(new_custom_meta) | ||
|
|
Copilot
AI
Jan 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new custom metadata methods (get_custom_meta_list, get_all_custom_meta, and update_custom_meta) added to the BatchMeta class lack test coverage. Since there is comprehensive test coverage for other BatchMeta methods in tests/test_metadata.py, these new methods should also have corresponding unit tests to ensure they work correctly, especially given the complexity of indexing by global_index.
2. allow kv storage manager put returns their custom metadata per index per field Signed-off-by: tianyi-ge <tianyig@outlook.com>
2. add tests for kv storage with custom meta 3. update nits Signed-off-by: tianyi-ge <tianyig@outlook.com>
…-ascend into feat/custom-meta
Signed-off-by: tianyi-ge <tianyig@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| shapes: Optional shape information for the expected tensors. The | ||
| structure and interpretation of this argument are determined | ||
| by the concrete storage backend implementation. | ||
| dtypes: Optional data type information for the expected tensors. | ||
| The structure and interpretation of this argument are | ||
| determined by the concrete storage backend implementation. | ||
| custom_meta: Optional backend-specific metadata used to control | ||
| or optimize the retrieval process. Its format is defined by | ||
| the concrete storage backend implementation. | ||
| Returns: | ||
| list[Tensor]: List of tensors retrieved from the storage backend, |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation in the docstring states that get returns a list of Tensor, but the actual return type annotation on line 39 is list[Any], and the implementation can return any type of object (not just tensors). The docstring on line 54 should be updated to reflect that the return type is list[Any], not specifically tensors, to match the actual behavior and type annotation.
| shapes: Optional shape information for the expected tensors. The | |
| structure and interpretation of this argument are determined | |
| by the concrete storage backend implementation. | |
| dtypes: Optional data type information for the expected tensors. | |
| The structure and interpretation of this argument are | |
| determined by the concrete storage backend implementation. | |
| custom_meta: Optional backend-specific metadata used to control | |
| or optimize the retrieval process. Its format is defined by | |
| the concrete storage backend implementation. | |
| Returns: | |
| list[Tensor]: List of tensors retrieved from the storage backend, | |
| shapes: Optional shape information for the expected values. The | |
| structure and interpretation of this argument are determined | |
| by the concrete storage backend implementation. | |
| dtypes: Optional data type information for the expected values. | |
| The structure and interpretation of this argument are | |
| determined by the concrete storage backend implementation. | |
| custom_meta: Optional backend-specific metadata used to control | |
| or optimize the retrieval process. Its format is defined by | |
| the concrete storage backend implementation. | |
| Returns: | |
| list[Any]: List of values retrieved from the storage backend, |
transfer_queue/controller.py
Outdated
| if custom_meta: | ||
| if len(global_indices) != len(custom_meta): | ||
| raise ValueError( | ||
| f"Length of global_indices ({len(global_indices)}) does not match " | ||
| f"length of custom_meta ({len(custom_meta)})" | ||
| ) | ||
| custom_meta_value = itemgetter(*global_indices)(custom_meta) if custom_meta else None | ||
| if not isinstance(custom_meta_value, tuple): | ||
| custom_meta_value = (custom_meta_value,) | ||
| for i, global_idx in enumerate(global_indices): | ||
| if global_idx not in self.field_custom_metas: | ||
| self.field_custom_metas[global_idx] = {} | ||
| if custom_meta_value is not None: | ||
| self.field_custom_metas[global_idx].update(custom_meta_value[i]) | ||
|
|
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated code block detected. Lines 416-427 and lines 429-442 contain nearly identical logic for handling custom_meta updates. The first block appears to be the correct implementation with proper conditional checking, while the second block (429-442) contains duplicate logic with a slight variation in the None check on line 441. This duplication should be removed, keeping only one instance of the custom_meta handling logic.
| if custom_meta: | |
| if len(global_indices) != len(custom_meta): | |
| raise ValueError( | |
| f"Length of global_indices ({len(global_indices)}) does not match " | |
| f"length of custom_meta ({len(custom_meta)})" | |
| ) | |
| custom_meta_value = itemgetter(*global_indices)(custom_meta) if custom_meta else None | |
| if not isinstance(custom_meta_value, tuple): | |
| custom_meta_value = (custom_meta_value,) | |
| for i, global_idx in enumerate(global_indices): | |
| if global_idx not in self.field_custom_metas: | |
| self.field_custom_metas[global_idx] = {} | |
| if custom_meta_value is not None: | |
| self.field_custom_metas[global_idx].update(custom_meta_value[i]) |
| # TODO(tianyi): the order of custom meta is coupled with keys/values | ||
| # if _generate_keys or _generate_values changes, this will break | ||
| for (field_name, global_idx), meta_value in zip( | ||
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), | ||
| custom_meta, | ||
| strict=False, | ||
| ): | ||
| per_field_custom_meta[global_idx][field_name] = meta_value |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 486 states that the order of custom_meta is coupled with keys/values and mentions that changes to _generate_keys or _generate_values will break this. This is a significant maintainability concern. The code uses itertools.product with sorted field names and global_indexes to reconstruct the order, but this creates tight coupling and fragility. Consider adding validation or a more robust way to associate custom_meta with specific keys, or at least add integration tests that verify this ordering assumption.
| for (field_name, global_idx), meta_value in zip( | ||
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), | ||
| custom_meta, | ||
| strict=False, |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The zip operation on lines 488-492 uses strict=False, which means if the lengths of the iterables don't match, the zip will silently truncate to the shortest iterable without raising an error. This is dangerous because a length mismatch check is performed on line 479-480, but if that check passes and there's still somehow a mismatch (e.g., due to a bug in the key generation logic), the error will be silently ignored. Consider using strict=True to catch such mismatches, or remove the strict parameter to make the behavior explicit since the length check already exists.
| strict=False, | |
| strict=True, |
| return None | ||
|
|
||
| def _batch_get(self, keys: list[str], shapes: list, dtypes: list) -> list[Any]: | ||
| def _batch_get(self, keys: list[str], shapes: list, dtypes: list) -> Optional[list[Any]]: |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation Optional[list[Any]] is inconsistent with the actual behavior. The _batch_get method always returns a list[Any] (never None), as seen on lines 195-264 where all code paths return a populated list. The return type should be list[Any] instead of Optional[list[Any]] to accurately reflect the actual behavior.
| def _batch_get(self, keys: list[str], shapes: list, dtypes: list) -> Optional[list[Any]]: | |
| def _batch_get(self, keys: list[str], shapes: list, dtypes: list) -> list[Any]: |
transfer_queue/controller.py
Outdated
| if not global_indices or not dtypes or not shapes: | ||
| return |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conditional check on line 391 has been changed to also check for dtypes and shapes being non-None. However, this creates a problem: if dtypes or shapes are None, the function returns early, but then on lines 399-400, the code attempts to access dtypes and shapes without None checks, which would have already returned. This makes the subsequent None handling on lines 399-400 unreachable. The original assert statements were better for catching programming errors. If the intent is to make these parameters optional, then the function should handle None values gracefully throughout, not just return early.
transfer_queue/controller.py
Outdated
| self.field_dtypes[global_idx].update(dtype_value[i]) | ||
| self.field_shapes[global_idx].update(shape_value[i]) |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change from assert statements to ValueError is good for production code, but the removal of conditional checks for dtypes and shapes on lines 413-414 is problematic. The original code had conditions checking if dtype_value and shape_value are not None before updating. After removing these checks, the code will always attempt to update even when values might be None, which could cause errors. The early return on line 391-392 makes this less likely, but creates inconsistency in the error handling approach.
| self.field_dtypes[global_idx].update(dtype_value[i]) | |
| self.field_shapes[global_idx].update(shape_value[i]) | |
| if dtype_value[i] is not None: | |
| self.field_dtypes[global_idx].update(dtype_value[i]) | |
| if shape_value[i] is not None: | |
| self.field_shapes[global_idx].update(shape_value[i]) |
Signed-off-by: tianyi-ge <tianyig@outlook.com>
tests/test_controller.py
Outdated
|
|
||
| print("✓ Custom_meta merge on update correct") | ||
|
|
||
| def test_custom_meta_with_complex_nested_data(self, ray_setup): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed since we are relying python dict
tests/test_controller.py
Outdated
| print("✓ Clear meta correct") | ||
|
|
||
|
|
||
| class TestCustomMeta: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should dedup the pytest a little bit. Some of the tests are duplicated with tests/test_controller_data_partitions.py. The core custom_meta tests should be maintained in test_controller_data_partitions.py, and the test_controller.py only validates the outer interface.
| print("✓ get_field_custom_meta with partial fields works") | ||
|
|
||
| # Test 4: get_field_custom_meta with non-existent global_index | ||
| empty_meta = partition.get_field_custom_meta([999], ["input_ids"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this will happen since we strictly requires the shape of custom_meta to be exactly with number of samples (commented earlier)?
transfer_queue/controller.py
Outdated
| ): | ||
| """Update field dtype and shape metadata.""" | ||
| if not global_indices: | ||
| if not global_indices or not dtypes or not shapes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe when only dtypes or only shapes are provided, we should also update the metadata.
| selected_fields = {name: self.fields[name] for name in field_names if name in self.fields} | ||
|
|
||
| # construct new SampleMeta instance | ||
| selected_sample_meta = SampleMeta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: custom_meta maintains in FieldMeta
Signed-off-by: tianyi-ge <tianyig@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
transfer_queue/storage/clients/yuanrong_client.py:284
- The custom_meta parameter is accepted but not used in the implementation. According to the PR description, yuanrong storage is supposed to use custom_meta to store device info. Consider either utilizing this parameter in _batch_get to optimize retrieval based on device information, or document that this is reserved for future use.
def get(self, keys: list[str], shapes=None, dtypes=None, custom_meta=None) -> list[Any]:
"""Retrieves multiple values from remote storage with expected metadata.
Requires shape and dtype hints to reconstruct NPU tensors correctly.
Args:
keys (List[str]): Keys to fetch.
shapes (List[List[int]]): Expected tensor shapes (use [] for scalars).
dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data.
custom_meta (List[str], optional): Device type (npu/cpu) for each key
Returns:
List[Any]: Retrieved values in the same order as input keys.
"""
if shapes is None or dtypes is None:
raise ValueError("YuanrongStorageClient needs Expected shapes and dtypes")
if not (len(keys) == len(shapes) == len(dtypes)):
raise ValueError("Lengths of keys, shapes, dtypes must match")
return self._batch_get(keys, shapes, dtypes)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (field_name, global_idx), meta_value in zip( | ||
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), | ||
| custom_meta, | ||
| strict=False, |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The strict=False parameter in zip allows for mismatched lengths between the iterables, which could mask bugs. Consider using strict=True to ensure the length of custom_meta matches the expected number of items from itertools.product. This would provide earlier detection of length mismatches beyond the check on line 479.
| strict=False, | |
| strict=True, |
| # Use itertools.product to eliminate nested loops | ||
| for global_idx in metadata.global_indexes: | ||
| per_field_custom_meta[global_idx] = {} | ||
|
|
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO comment indicates tight coupling between the order of custom_meta and keys/values. This ordering dependency is fragile. Consider adding a runtime assertion that validates the ordering is consistent (e.g., by comparing the generated keys with the expected iteration order), or refactor to use a dictionary-based approach for custom_meta to eliminate the ordering dependency.
| # Validate that the assumed iteration order for (field_name, global_idx) | |
| # matches the ordering used when generating keys for storage_client.put. | |
| expected_keys = self._generate_keys(sorted(metadata.field_names), metadata.global_indexes) | |
| if expected_keys != keys: | |
| raise ValueError( | |
| "Inconsistent key ordering detected: _generate_keys does not produce keys in the " | |
| "same order as sorted(metadata.field_names) x metadata.global_indexes, which is " | |
| "required for correct alignment of custom_meta." | |
| ) |
Signed-off-by: tianyi-ge <tianyig@outlook.com>
| # TODO(tianyi): the order of custom meta is coupled with keys/values | ||
| for (field_name, global_idx), meta_value in zip( | ||
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), | ||
| custom_meta, | ||
| strict=True, | ||
| ): | ||
| per_field_custom_meta[global_idx][field_name] = meta_value | ||
| metadata.update_custom_meta(per_field_custom_meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this way better than:
id=0
for field_name in sorted(metadata.field_name ):
for global_idx in metadata.global_indexes:
per_field_custom_meta[global_idx][field_name = meta_value[id]
id+=1There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be any problem, I used itertools.product to implement it before as well. ~ qwq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| keys (list): List of string keys to fetch. | ||
| shapes (list, optional): Ignored. For compatibility with KVStorageManager. | ||
| dtypes (list, optional): Ignored. For compatibility with KVStorageManager. | ||
| custom_meta (list, optional): Ray object ref for each key |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring description for custom_meta parameter is incorrect. It states "Ray object ref for each key" but custom_meta is actually backend-specific metadata (not necessarily ray object refs). The description should be more generic to match the base class documentation and reflect that this parameter is for compatibility with the KVStorageManager interface, even if ray storage doesn't use it.
| custom_meta (list, optional): Ray object ref for each key | |
| custom_meta (list, optional): Backend-specific metadata for each key. Ignored by | |
| RayStorageClient; kept for compatibility with KVStorageManager. |
transfer_queue/controller.py
Outdated
| """Get shape for a specific sample and field.""" | ||
| return self.field_shapes.get(global_index, {}).get(field_name) | ||
|
|
||
| def get_field_custom_meta(self, global_indices: list[int], field_names: list[str]) -> Optional[Any]: |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation should be dict[int, dict[str, Any]] instead of Optional[Any]. The method always returns a dictionary (possibly empty), never None, so Optional is incorrect. The specific structure being returned is also more useful to document than the generic Any type.
| def get_field_custom_meta(self, global_indices: list[int], field_names: list[str]) -> Optional[Any]: | |
| def get_field_custom_meta( | |
| self, global_indices: list[int], field_names: list[str] | |
| ) -> dict[int, dict[str, Any]]: |
transfer_queue/controller.py
Outdated
| return self.field_shapes.get(global_index, {}).get(field_name) | ||
|
|
||
| def get_field_custom_meta(self, global_indices: list[int], field_names: list[str]) -> Optional[Any]: | ||
| """Get custom_meta for a specific sample and field.""" |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring says "Get custom_meta for a specific sample and field" but the method signature takes lists of global_indices and field_names, so it handles multiple samples and fields. The docstring should be updated to reflect this: "Get custom_meta for multiple samples and fields."
| """Get custom_meta for a specific sample and field.""" | |
| """Get custom_meta for multiple samples and fields.""" |
| global_indexes: Data update related global_indexes. | ||
| dtypes: Per-field dtypes for each field, in {global_index: {field: dtype}} format. | ||
| shapes: Per-field shapes for each field, in {global_index: {field: shape}} format. | ||
| custom_meta: Per-field custom_meta for each field, in {global_index: {field: custom_meta}} format. |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter custom_meta in the docstring describes it as a list but the actual parameter type is Optional[dict[int, dict[str, Any]]] based on the method signature and usage. The docstring should accurately reflect that this is a dictionary mapping global_indexes to field-level custom metadata, not a list.
| custom_meta: Per-field custom_meta for each field, in {global_index: {field: custom_meta}} format. | |
| custom_meta: Optional dict mapping each global_index to per-field custom metadata, | |
| in the format {global_index: {field: custom_meta}}. |
| keys (List[str]): Keys to fetch. | ||
| shapes (List[List[int]]): Expected tensor shapes (use [] for scalars). | ||
| dtypes (List[Optional[torch.dtype]]): Expected dtypes; use None for non-tensor data. | ||
| custom_meta (List[str], optional): Device type (npu/cpu) for each key |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring description for custom_meta parameter is incorrect. It states "List[str], optional: Device type (npu/cpu) for each key" but custom_meta is actually a list aligned with keys containing backend-specific metadata (not just device types, and not necessarily strings). The description should be more generic to match the base class documentation and not assume yuanrong-specific semantics.
| custom_meta (List[str], optional): Device type (npu/cpu) for each key | |
| custom_meta (List[Any], optional): Backend-specific metadata for each key, aligned with ``keys``. |
Signed-off-by: tianyi-ge <tianyig@outlook.com>
Goal
Add
custom_metato provide flexibility for different storage backends so that controller can help to maintain these backend-specific info.Example use cases:
Unit tests
Unit tests are added to