Skip to content

Conversation

@0oshowero0
Copy link
Collaborator

@0oshowero0 0oshowero0 commented Jan 23, 2026

Background

In previous PR TransferQueue/TransferQueue#157, a coarse-grained status check was implemented.

Through the following APIs in TransferQueueClient, a single boolean flag is returned to indicate whether all samples are produced or consumed.

    async def async_check_consumption_status(
        self,
        task_name: str,
        partition_id: str,
        socket: Optional[zmq.asyncio.Socket] = None,
    ) -> bool:
        """Check if all samples for current partition have been consumed by a specific task.

        Args:
            task_name: Name of the task to check consumption for
            partition_id: Partition id to check consumption status for
            socket: ZMQ async socket for message transmission (injected by decorator)

        Returns:
            bool: True if all samples have been consumed by the task, False otherwise

        Raises:
            RuntimeError: If communication fails or controller returns error response
        """

    async def async_check_production_status(
        self,
        data_fields: list[str],
        partition_id: str,
        socket: Optional[zmq.asyncio.Socket] = None,
    ) -> bool:
        """Check if all samples for current partition are ready (produced) for consumption.

        Args:
            data_fields: Data fields to check production status for
            partition_id: Partition id to check production status for
            socket: ZMQ async socket for message transmission (injected by decorator)

        Returns:
            bool: True if all samples have been produced and ready, False otherwise

        Raises:
            RuntimeError: If communication fails or controller returns error response\
        """

Changes

This PR introduces a fine-grained check mechanism. It provides the capability that allows users to retrieving the detailed production and consumption status of every individual sample.

    async def async_get_consumption_status(
        self,
        task_name: str,
        partition_id: str,
        socket: Optional[zmq.asyncio.Socket] = None,
    ) -> tuple[Optional[Tensor], Optional[Tensor]]:
        """Get consumption status for current partition in a specific task.

        Args:
            task_name: Name of the task to check consumption for
            partition_id: Partition id to check consumption status for
            socket: ZMQ async socket for message transmission (injected by decorator)

        Returns:
            Tuple of:
            - Partition global index tensor
            - Consumption status tensor for the specified task. 1 for consumed, 0 for not consumed.
        """

    async def async_get_production_status(
        self,
        data_fields: list[str],
        partition_id: str,
        socket: Optional[zmq.asyncio.Socket] = None,
    ) -> tuple[Optional[Tensor], Optional[Tensor]]:
        """Get production status for current partition in a specific task.

        Args:
            data_fields: Data fields to check production status for
            partition_id: Partition id to check production status for
            socket: ZMQ async socket for message transmission (injected by decorator)

        Returns:
            Tuple of:
            - Partition global index tensor
            - Production status tensor for the specified task. 1 for ready, 0 for not ready.

        Raises:
            RuntimeError: If communication fails or controller returns error response
        """

CC @NINGBENZHE @walterchenchn

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings January 23, 2026 04:17
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces fine-grained production and consumption status retrieval capabilities. Previously, the API only returned boolean values indicating whether all samples were produced/consumed. Now, the API returns tensors with per-sample status information, enabling users to retrieve detailed status for every individual sample.

Changes:

  • Renamed ZMQ request types from CHECK_CONSUMPTION/CHECK_PRODUCTION to GET_CONSUMPTION/GET_PRODUCTION to reflect the new functionality
  • Modified get_consumption_status and get_production_status_for_fields methods to return tuples of (global_index_tensor, status_tensor) instead of booleans
  • Added new client methods async_get_consumption_status and async_get_production_status that return the fine-grained tensor data
  • Maintained backward compatibility by implementing async_check_consumption_status and async_check_production_status as wrappers that convert tensor results to booleans
  • Updated all tests to verify the new tensor-based return values

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
transfer_queue/utils/zmq_utils.py Renamed enum values from CHECK_* to GET_* to reflect fine-grained retrieval
transfer_queue/controller.py Modified status methods to return (global_index, status_tensor) tuples; updated request handling logic
transfer_queue/client.py Added new async_get_* methods for fine-grained status retrieval and maintained backward compatibility with async_check_* methods
tests/test_controller_data_partitions.py Updated tests to handle new tuple return values and added tests for mask parameter
tests/test_controller.py Updated assertions to validate global_index and status tensors
tests/test_client.py Added tests for new get_* methods and updated mock responses
Comments suppressed due to low confidence (1)

transfer_queue/controller.py:520

  • The return type annotation is incorrect. When field_names is None, empty, or a field is not found, the function returns False (a boolean), but the function signature declares the return type as tuple[Tensor, Tensor]. This will cause type checking failures. The early return statements on lines 515 and 520 should return (None, None) to match the declared return type.
        if self.production_status is None or field_names is None or len(field_names) == 0:
            return False

        # Check if all requested fields are registered
        for field_name in field_names:
            if field_name not in self.field_name_mapping:
                return False

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant