[BREAKING][Feat] Support user-defined sampler to manager data consumption#101
Conversation
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis pull request introduces a pluggable sampling system to the TransferQueue architecture. It replaces the boolean Changes
Sequence DiagramsequenceDiagram
actor Client
participant AsyncClient as AsyncTransferQueueClient
participant Controller as TransferQueueController
participant Sampler as BaseSampler
Client->>AsyncClient: async_get_meta(data_fields, batch_size, partition_id, mode="fetch", sampling_config={...})
AsyncClient->>Controller: GET_META request with sampling_config payload
alt mode == "fetch"
Controller->>Sampler: sample(ready_indexes, batch_size, **sampling_config)
Sampler-->>Controller: (sampled_indexes, consumed_indexes)
Controller->>Controller: generate_batch_meta(batch_global_indexes, consumed_indexes, ...)
else mode == "insert"
Controller->>Controller: generate_batch_meta(batch_global_indexes, consumed_indexes, ...)
end
Controller-->>AsyncClient: BatchMeta response
AsyncClient-->>Client: BatchMeta
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a pluggable sampler abstraction for the TransferQueue system, allowing users to customize how samples are selected and consumed from the queue. The PR refactors the existing sequential sampling logic into a dedicated SequentialSampler class and adds a GRPOGroupNSampler for grouped sampling scenarios.
Key changes:
- Introduces
BaseSamplerabstract class with a standardizedsample()interface - Implements
SequentialSamplerandGRPOGroupNSampleras concrete sampling strategies - Updates
TransferQueueControllerto accept and use configurable samplers - Replaces the
get_n_samplesparameter with a more flexiblesampling_configdictionary
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/sampler/base.py | Defines abstract BaseSampler class with sampling interface |
| transfer_queue/sampler/sequential_sampler.py | Implements basic sequential sampling strategy |
| transfer_queue/sampler/grpo_group_n_sampler.py | Implements grouped sampling for GRPO use cases |
| transfer_queue/sampler/init.py | Exports sampler classes for public API |
| transfer_queue/controller.py | Integrates sampler support into controller, refactors parameter validation |
| transfer_queue/client.py | Updates client API to use sampling_config instead of get_n_samples |
| transfer_queue/init.py | Exports sampler classes at package level |
| tests/test_controller.py | Removes deprecated get_n_samples parameter from tests |
| recipe/simple_use_case/sync_demo.py | Removes get_n_samples and adds usage examples |
| recipe/simple_use_case/async_demo.py | Removes get_n_samples and adds usage examples |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
recipe/simple_use_case/async_demo.py(1 hunks)recipe/simple_use_case/sync_demo.py(1 hunks)tests/test_controller.py(0 hunks)transfer_queue/__init__.py(2 hunks)transfer_queue/client.py(7 hunks)transfer_queue/controller.py(11 hunks)transfer_queue/sampler/__init__.py(1 hunks)transfer_queue/sampler/base.py(1 hunks)transfer_queue/sampler/grpo_group_n_sampler.py(1 hunks)transfer_queue/sampler/sequential_sampler.py(1 hunks)
💤 Files with no reviewable changes (1)
- tests/test_controller.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Agent
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
…tions in async_demo.py and sync_demo.py Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
…and consistency Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
…ransferQueueController Signed-off-by: jianjunzhong <jianjunzhong@foxmail.com>
| task_name="rl_training", | ||
| sampling_config={"n_samples_per_prompt": 4} # 4 samples per prompt | ||
| ) | ||
| # This will return 16 samples organized as 4 groups of 4 samples each |
There was a problem hiding this comment.
| # This will return 16 samples organized as 4 groups of 4 samples each |
| belong to the same prompt group: | ||
| ``` | ||
| ready_indexes = [prompt1_sample1, prompt1_sample2, prompt1_sample3, prompt1_sample4, | ||
| prompt2_sample1, prompt2_sample2, prompt2_sample3, prompt2_sample4, ...] | ||
| ``` |
There was a problem hiding this comment.
| belong to the same prompt group: | |
| ``` | |
| ready_indexes = [prompt1_sample1, prompt1_sample2, prompt1_sample3, prompt1_sample4, | |
| prompt2_sample1, prompt2_sample2, prompt2_sample3, prompt2_sample4, ...] | |
| ``` |
| *args: Any, | ||
| **kwargs: Any, |
There was a problem hiding this comment.
| *args: Any, | |
| **kwargs: Any, |
| *args: Additional positional arguments (ignored in current implementation) | ||
| **kwargs: Additional keyword arguments (ignored in current implementation) |
There was a problem hiding this comment.
| *args: Additional positional arguments (ignored in current implementation) | |
| **kwargs: Additional keyword arguments (ignored in current implementation) |
| *args: Any, | ||
| **kwargs: Any, |
There was a problem hiding this comment.
| *args: Any, | |
| **kwargs: Any, |
| *args: Additional positional arguments (ignored). | ||
| **kwargs: Additional keyword arguments (ignored). |
There was a problem hiding this comment.
| *args: Additional positional arguments (ignored). | |
| **kwargs: Additional keyword arguments (ignored). |
Summary by CodeRabbit
New Features
API Changes
get_n_samplesparameter withsampling_config(dictionary-based) in metadata retrieval methods for more flexible sampling control.