Skip to content
This repository was archived by the owner on Jan 21, 2026. It is now read-only.

[BREAKING][Feat] Support user-defined sampler to manager data consumption#101

Merged
0oshowero0 merged 8 commits into
TransferQueue:devfrom
jianjunzhong:refactor/1105
Nov 10, 2025
Merged

[BREAKING][Feat] Support user-defined sampler to manager data consumption#101
0oshowero0 merged 8 commits into
TransferQueue:devfrom
jianjunzhong:refactor/1105

Conversation

@jianjunzhong

@jianjunzhong jianjunzhong commented Nov 5, 2025

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

  • New Features

    • Introduced pluggable sampling strategies for metadata retrieval, including sequential and GRPO group-based sampling options.
    • Exposed new sampler classes through the public API for advanced sampling configuration.
  • API Changes

    • Replaced get_n_samples parameter with sampling_config (dictionary-based) in metadata retrieval methods for more flexible sampling control.

Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
@coderabbitai

coderabbitai Bot commented Nov 5, 2025

Copy link
Copy Markdown

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

This pull request introduces a pluggable sampling system to the TransferQueue architecture. It replaces the boolean get_n_samples parameter with a sampling_config dictionary parameter across client and controller APIs. Three sampler implementations are added: a BaseSampler abstract base class, SequentialSampler, and GRPOGroupNSampler. The controller is updated to accept and utilize samplers during metadata retrieval operations.

Changes

Cohort / File(s) Summary
Sampler Infrastructure
transfer_queue/sampler/base.py, transfer_queue/sampler/sequential_sampler.py, transfer_queue/sampler/grpo_group_n_sampler.py
Introduces new sampler abstraction: BaseSampler with abstract sample(ready_indexes, batch_size, **kwargs) method; SequentialSampler selects first batch_size elements; GRPOGroupNSampler groups indexes by n_samples_per_prompt and enforces batch_size divisibility.
Sampler Module Exports
transfer_queue/sampler/__init__.py
Exposes new sampler classes via __all__: BaseSampler, SequentialSampler, GRPOGroupNSampler.
Client API Updates
transfer_queue/client.py
Replaced get_n_samples boolean parameter with sampling_config: Optional[dict[str, Any]] in AsyncTransferQueueClient.async_get_meta and TransferQueueClient.get_meta signatures; updated request payload and docstrings accordingly.
Controller Core Updates
transfer_queue/controller.py
Added sampler support to __init__ (accepts BaseSampler instance or type, defaults to SequentialSampler); refactored get_metadata to use sampling_config instead of get_n_samples; renamed batch_global_indices to batch_global_indexes in generate_batch_meta; introduced consumed_indexes parameter; added five new internal validation helpers (_validate_partition_id, _validate_required_params, _extract_optional_params, _validate_get_meta_params, _validate_check_consumption_params).
Module Exports
transfer_queue/__init__.py
Added public imports: BaseSampler, GRPOGroupNSampler, SequentialSampler; updated __all__ accordingly.
Demo and Test Updates
recipe/simple_use_case/async_demo.py, recipe/simple_use_case/sync_demo.py, tests/test_controller.py
Removed get_n_samples arguments from get_meta and async_get_meta calls; added commented guidance blocks in demo files for GRPOGroupNSampler usage.

Sequence Diagram

sequenceDiagram
    actor Client
    participant AsyncClient as AsyncTransferQueueClient
    participant Controller as TransferQueueController
    participant Sampler as BaseSampler

    Client->>AsyncClient: async_get_meta(data_fields, batch_size, partition_id, mode="fetch", sampling_config={...})
    AsyncClient->>Controller: GET_META request with sampling_config payload
    
    alt mode == "fetch"
        Controller->>Sampler: sample(ready_indexes, batch_size, **sampling_config)
        Sampler-->>Controller: (sampled_indexes, consumed_indexes)
        Controller->>Controller: generate_batch_meta(batch_global_indexes, consumed_indexes, ...)
    else mode == "insert"
        Controller->>Controller: generate_batch_meta(batch_global_indexes, consumed_indexes, ...)
    end
    
    Controller-->>AsyncClient: BatchMeta response
    AsyncClient-->>Client: BatchMeta
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Areas requiring extra attention:
    • transfer_queue/controller.py: Substantial refactor with new sampler initialization, sampling_config routing logic, and five new validation helpers; requires verification of state handling and error paths
    • transfer_queue/sampler/grpo_group_n_sampler.py: PyTorch tensor reshaping and grouping logic for n_samples_per_prompt; verify batch_size divisibility enforcement and index flattening correctness
    • transfer_queue/client.py: Signature changes across two public methods (async and sync); ensure backward compatibility implications are understood
    • Cross-file consistency: Verify that batch_global_indicesbatch_global_indexes renaming is applied consistently throughout the controller and consumed_indexes is threaded correctly through all paths

Possibly related PRs

Poem

🐰 A sampler's quest through indexes neat,
From SequentialHops to GRPO's beat,
Config replaces flags of old,
Pluggable strategies, fresh and bold,
Metadata flows where samplers lead! 🎯

Pre-merge checks and finishing touches

✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: introducing support for user-defined samplers to manage data consumption, which is the core feature added across multiple files.

Comment @coderabbitai help to get the list of available commands and usage tips.

@0oshowero0 0oshowero0 requested a review from Copilot November 5, 2025 10:48
@0oshowero0

Copy link
Copy Markdown
Member

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Nov 5, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a pluggable sampler abstraction for the TransferQueue system, allowing users to customize how samples are selected and consumed from the queue. The PR refactors the existing sequential sampling logic into a dedicated SequentialSampler class and adds a GRPOGroupNSampler for grouped sampling scenarios.

Key changes:

  • Introduces BaseSampler abstract class with a standardized sample() interface
  • Implements SequentialSampler and GRPOGroupNSampler as concrete sampling strategies
  • Updates TransferQueueController to accept and use configurable samplers
  • Replaces the get_n_samples parameter with a more flexible sampling_config dictionary

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
transfer_queue/sampler/base.py Defines abstract BaseSampler class with sampling interface
transfer_queue/sampler/sequential_sampler.py Implements basic sequential sampling strategy
transfer_queue/sampler/grpo_group_n_sampler.py Implements grouped sampling for GRPO use cases
transfer_queue/sampler/init.py Exports sampler classes for public API
transfer_queue/controller.py Integrates sampler support into controller, refactors parameter validation
transfer_queue/client.py Updates client API to use sampling_config instead of get_n_samples
transfer_queue/init.py Exports sampler classes at package level
tests/test_controller.py Removes deprecated get_n_samples parameter from tests
recipe/simple_use_case/sync_demo.py Removes get_n_samples and adds usage examples
recipe/simple_use_case/async_demo.py Removes get_n_samples and adds usage examples

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread transfer_queue/controller.py
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/sampler/grpo_group_n_sampler.py Outdated
Comment thread transfer_queue/sampler/grpo_group_n_sampler.py
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/sampler/grpo_group_n_sampler.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 445edb4 and fcbf4e6.

📒 Files selected for processing (10)
  • recipe/simple_use_case/async_demo.py (1 hunks)
  • recipe/simple_use_case/sync_demo.py (1 hunks)
  • tests/test_controller.py (0 hunks)
  • transfer_queue/__init__.py (2 hunks)
  • transfer_queue/client.py (7 hunks)
  • transfer_queue/controller.py (11 hunks)
  • transfer_queue/sampler/__init__.py (1 hunks)
  • transfer_queue/sampler/base.py (1 hunks)
  • transfer_queue/sampler/grpo_group_n_sampler.py (1 hunks)
  • transfer_queue/sampler/sequential_sampler.py (1 hunks)
💤 Files with no reviewable changes (1)
  • tests/test_controller.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Agent

Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/sampler/grpo_group_n_sampler.py Outdated
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py
Comment thread transfer_queue/sampler/sequential_sampler.py
Comment thread transfer_queue/client.py Outdated
Comment thread transfer_queue/client.py Outdated
Comment thread transfer_queue/controller.py
Comment thread transfer_queue/controller.py
Comment thread transfer_queue/controller.py
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
…tions in async_demo.py and sync_demo.py

Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread recipe/simple_use_case/sync_demo.py Outdated
Comment thread recipe/simple_use_case/sync_demo.py Outdated
Comment thread recipe/simple_use_case/sync_demo.py Outdated
Comment thread recipe/simple_use_case/async_demo.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/sampler/base.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
…and consistency

Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
…ransferQueueController

Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
task_name="rl_training",
sampling_config={"n_samples_per_prompt": 4} # 4 samples per prompt
)
# This will return 16 samples organized as 4 groups of 4 samples each

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# This will return 16 samples organized as 4 groups of 4 samples each

Comment thread transfer_queue/sampler/grpo_group_n_sampler.py Outdated
Comment on lines +61 to +65
belong to the same prompt group:
```
ready_indexes = [prompt1_sample1, prompt1_sample2, prompt1_sample3, prompt1_sample4,
prompt2_sample1, prompt2_sample2, prompt2_sample3, prompt2_sample4, ...]
```

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
belong to the same prompt group:
```
ready_indexes = [prompt1_sample1, prompt1_sample2, prompt1_sample3, prompt1_sample4,
prompt2_sample1, prompt2_sample2, prompt2_sample3, prompt2_sample4, ...]
```

Comment on lines +83 to +84
*args: Any,
**kwargs: Any,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*args: Any,
**kwargs: Any,

Comment on lines +97 to +98
*args: Additional positional arguments (ignored in current implementation)
**kwargs: Additional keyword arguments (ignored in current implementation)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*args: Additional positional arguments (ignored in current implementation)
**kwargs: Additional keyword arguments (ignored in current implementation)

Comment on lines +57 to +58
*args: Any,
**kwargs: Any,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*args: Any,
**kwargs: Any,

Comment on lines +66 to +67
*args: Additional positional arguments (ignored).
**kwargs: Additional keyword arguments (ignored).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*args: Additional positional arguments (ignored).
**kwargs: Additional keyword arguments (ignored).

Comment thread transfer_queue/sampler/grpo_group_n_sampler.py Outdated
@0oshowero0 0oshowero0 merged commit b7deac2 into TransferQueue:dev Nov 10, 2025
3 checks passed
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants