datareader : sampling with moving weights#1174
datareader : sampling with moving weights#1174artemru wants to merge 1 commit intomain_w2v2_pretrainingfrom
Conversation
mattsetz
left a comment
There was a problem hiding this comment.
looking good! i left some questions about the distributed processing, but mostly for clearing up my own understanding.
| next_name = self._next_dataset_name() | ||
| try: | ||
| batch = next(self._pipeline_iters[next_name]) | ||
| except StopIteration: |
There was a problem hiding this comment.
when would we expect this to occur? when the pipeline has epoched through all data?
| # If we read less than `num_accumulate` batches, it means we reached end | ||
| # of data. | ||
| if self._options.drop_remainder and len(batches) != num_accumulate: | ||
| batches.clear() |
There was a problem hiding this comment.
hm. a bit confused here. some clarifying questions:
-
when would this condition be True? isn't that case handled by the above exception handling?
-
dumb Q: what does
batches.clear()do? (just empty the batch List?) -
at a conceptual level, seems like this logic is used to determine if we should halt training once we reach end of data (or when any of the pipelines reaches end of data), if
self._options.drop_remainderis set to True (and if False, we just continue epoching the data). is this understanding correct?
| num_batches = _min_num_batches(local_num_batches, self._gang) | ||
|
|
||
| if num_batches != local_num_batches: | ||
| batches = batches[:num_batches] |
There was a problem hiding this comment.
does this imply that batches is a global list of batches across all ranks? i thought this was a local list...
There was a problem hiding this comment.
this is the general fs2 logic to work with pipelines epoch's end : for some rank the data will be returned but others should skip their turn.
Probably for that scheduled mixture we dont need that since we're dealing with infinite loop here.
|
|
||
| def _next_dataset_name(self) -> str: | ||
| weights = np.array( | ||
| [scheduler(self._step) for scheduler in self._weights_schedulers.values()], |
There was a problem hiding this comment.
i like the setup of making this callable. instead of having schedulers for each data pipeline, we could have one master scheduler function that rights the list of weights. but either works, i suppose this option you've implemented here gives us more flexibility.
What does this PR do? Please describe:
A summary of the change or the issue that is fixed.
Fixes #{issue number}
Does your PR introduce any breaking changes? If yes, please list them:
List of all backwards-incompatible changes.
Check list: