Skip to content

Conversation

@mahrsee1997
Copy link
Collaborator

No description provided.

…arated the request & download stage of fetching
@mahrsee1997 mahrsee1997 requested a review from alxmrs April 20, 2022 20:29
Copy link
Contributor

@alxmrs alxmrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the pipeline improvements look good! Here's my early feedback for the current draft.

Comment on lines 117 to 121
def fetch(self, dataset: str, selection: t.Dict) -> None:
pass

def download(self, dataset: str, result: t.Dict, output: str) -> None:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should raise a NotImplementedError.

self._redirector.__exit__(exc_type, exc_value, traceback)


class APIRequestExtended(api.APIRequest):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we include "MARS" in the name of this class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming discussion. I kind of like "extended", but is there a better name? What about "Adapter"? Any other ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, adding "MARS" to the name of the class may not be necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Named SplitMARSRequest.

Comment on lines 174 to 175
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit the constructor if it just calls super.

Comment on lines 209 to 219
tries = 0
while size != result["size"] and tries < 10:
size = self._transfer(
urljoin(self.url, result["href"]), target, result["size"]
)
if size != result["size"] and tries < 10:
tries += 1
self.log("Transfer interrupted, resuming in 60s...")
time.sleep(60)
else:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we could update or replace this block with code that is capable of downloading more than 1 MB of data at a time. It'd be nice if we could do something like this, for example:

shutil.copyfileobj(source_file, dest_file, DEFAULT_READ_BUFFER_SIZE)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, necessary code changes have been done.

open(target, "w").close()

size = -1
tries = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up with the comment below: We could use Beam's retry logic with exponential backoff (via the decorator) instead of re-using ECMWF's implementation. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, necessary code changes have been done.

with self.manifest.transact(config.selection, target, config.user_id):
with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
logger.info(f'[{worker_name}] Fetching and Downloading data for {target!r}.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

petty nit: can you use an & instead of "and" here :) ?

with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
result = self.fetch(client, config.dataset, config.selection)
yield (result, config, worker_name, temp.name, target)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating an internal data class for passing along this result.

Also: Are you sure that using the temp.name is safe? It's possible that the temporary file will disappear.

Furthermore, looking at the code: do you need to create a temporary file here? If you don't need it for the fetch, it's probably safer to move this to the next stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need the dataclass since we can probably simplify what's returned. For example, We don't need to pass the target since that can be derived from the config. So, I see the tuple consisting of three parts: config, worker_name, result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(you can choose your favorite order for these).

client = CLIENTS[self.client_name](config)
target = prepare_target_name(config)

with self.manifest.transact(config.selection, target, config.user_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complication that I didn't anticipate until now: It would probably be best if we updated the manifest to distinguish between retrieved and downloaded. WDYT?

Copy link
Collaborator Author

@mahrsee1997 mahrsee1997 Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made necessary changes. Added a new class variable in DownloadStatus named stage which represents the current stage of the request.

@retry_with_exponential_backoff
def upload(self, src: str, dest: str) -> None:
"""Upload blob to cloud storage, with retries."""
with io.FileIO(src, 'rb') as src_:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be with open(src, 'rb')?

'Default: make an educated guess per client & config. '
'Please see the client documentation for more details.')
parser.add_argument('-o', '--optimise-download', action='store_true', default=False,
help="Optimised the downloads.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you cross apply the description of what this does here.

@mahrsee1997 mahrsee1997 requested a review from alxmrs April 22, 2022 18:39
alxmrs added a commit that referenced this pull request Jan 4, 2023
I'm taking a leaf from @mahrsee1997's PR #148 so that we can copy data from the MARS server faster (using a larger buffer size). Thanks for the primary contribution here, Rahul.

* restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching

* retry logic of downloads for MARS client & other cosmetic changes.

* Remove fetch / dl split

* retrieve in two steps.

* rm fetch + dl methods.

* Fix: `nim_requests_per_key` does not require class construction.

* fix lint: removed unused import.

* add support for aria2 for faster download

* code changes as per Alex feedback.

Co-authored-by: mahrsee1997 <rahul@infocusp.in>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants